Apache FlinkCEP 实现超时状态监控的步骤详解

Apache FlinkCEP 实现超时状态监控的步骤详解 Linux 第1张 

CEP - Complex Event Processing复杂事件处理。

订单下单后超过一定时间还未进行支付确认。

打车订单生成后超过一定时间没有确认上车。

外卖超过预定送达时间一定时限还没有确认送达。

Apache FlinkCEP API

CEPTimeoutEventJob

FlinkCEP源码简析

DataStream和PatternStream

DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。

PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

  public     static   <IN, OUT>   SingleOutputStreamOperator  <OUT> createPatternStream(...){...}  public    static   <IN, OUT1, OUT2>   SingleOutputStreamOperator  <OUT1> createTimeoutPatternStream(...){...}    final     SingleOutputStreamOperator  <OUT> patternStream;

SingleOutputStreamOperator

  @Public    public     class     SingleOutputStreamOperator  <T>   extends     DataStream  <T> {...}

PatternStream的构造方法:

  PatternStream  (  final     DataStream  <T> inputStream,   final     Pattern  <T, ?> pattern) {        this  .inputStream = inputStream;        this  .pattern = pattern;        this  .comparator =   null  ;    }        PatternStream  (  final     DataStream  <T> inputStream,   final     Pattern  <T, ?> pattern,   final     EventComparator  <T> comparator) {        this  .inputStream = inputStream;        this  .pattern = pattern;        this  .comparator = comparator;    }

Pattern、Quantifier和EventComparator

Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。

如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

  public  class  Pattern  <T, F   extends   T> {  /** 模式名称 */  private  final  String   name;  /** 前面一个模式 */  private  final  Pattern  <T, ?   extends   T> previous;  /** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */  private  IterativeCondition  <F> condition;  /** 时间窗口长度,在时间长度内进行模式匹配 */  private  Time   windowTime;  /** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */  private  Quantifier   quantifier =   Quantifier  .one(  ConsumingStrategy  .STRICT);  /** 停止将事件收集到循环状态时,事件必须满足的条件 */  private  IterativeCondition  <F> untilCondition;  /**     * 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数     */  private  Times   times;  // 匹配到事件之后的跳过策略  private  final  AfterMatchSkipStrategy   afterMatchSkipStrategy;    ...  }

Quantifier是用来描述具体模式行为的,主要有三大类:

Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。

每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。

循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。

  public  class  Quantifier   {    ...  /**     * 5个属性,可以组合,但并非所有的组合都是有效的     */  public  enum  QuantifierProperty   {      SINGLE,      LOOPING,      TIMES,      OPTIONAL,      GREEDY    }  /**     * 描述在此模式中匹配哪些事件的策略     */  public  enum  ConsumingStrategy   {      STRICT,      SKIP_TILL_NEXT,      SKIP_TILL_ANY,      NOT_FOLLOW,      NOT_NEXT    }  /**     * 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到     */  public  static  class  Times   {  private  final  int   from;  private  final  int   to;  private  Times  (  int   from,   int   to) {  Preconditions  .checkArgument(from >   0  ,   "The from should be a positive number greater than 0."  );  Preconditions  .checkArgument(to >= from,   "The to should be a number greater than or equal to from: "   + from +   "."  );  this  .from = from;  this  .to = to;      }  public  int   getFrom() {  return   from;      }  public  int   getTo() {  return   to;      }  // 次数范围  public  static  Times   of(  int   from,   int   to) {  return  new  Times  (from, to);      }  // 指定具体次数  public  static  Times   of(  int   times) {  return  new  Times  (times, times);      }  @Override  public  boolean   equals(  Object   o) {  if   (  this   == o) {  return  true  ;        }  if   (o ==   null   || getClass() != o.getClass()) {  return  false  ;        }  Times   times = (  Times  ) o;  return   from == times.from &&          to == times.to;      }  @Override  public  int   hashCode() {  return  Objects  .hash(from, to);      }    }    ...  }

EventComparator,自定义事件比较器,实现EventComparator接口。

  public     interface     EventComparator  <T>   extends     Comparator  <T>,   Serializable   {  long   serialVersionUID =   1L  ;  }

NFACompiler和NFA

NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。

  public  class  NFACompiler   {    ...  /**     * NFAFactory 创建NFA的接口     *     * @param <T> Type of the input events which are processed by the NFA     */  public  interface  NFAFactory  <T>   extends  Serializable   {      NFA<T> createNFA();    }      /**     * NFAFactory的具体实现NFAFactoryImpl     *     * <p>The implementation takes the input type serializer, the window time and the set of     * states and their transitions to be able to create an NFA from them.     *     * @param <T> Type of the input events which are processed by the NFA     */  private  static  class  NFAFactoryImpl  <T>   implements  NFAFactory  <T> {        private  static  final  long   serialVersionUID =   8939783698296714379L  ;        private  final  long   windowTime;  private  final  Collection  <  State  <T>> states;  private  final  boolean   timeoutHandling;        private  NFAFactoryImpl  (  long   windowTime,  Collection  <  State  <T>> states,  boolean   timeoutHandling) {          this  .windowTime = windowTime;  this  .states = states;  this  .timeoutHandling = timeoutHandling;      }        @Override  public   NFA<T> createNFA() {  // 一个NFA由状态集合、时间窗口的长度和是否处理超时组成  return  new   NFA<>(states, windowTime, timeoutHandling);      }    }  }

NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。

更多内容参见

非确定有限状态自动机

  public  class   NFA<T> {  /**     * NFACompiler返回的所有有效的NFA状态集合     * These are directly derived from the user-specified pattern.     */  private  final  Map  <  String  ,   State  <T>> states;      /**     * Pattern.within(Time)指定的时间窗口长度     */  private  final  long   windowTime;      /**     * 一个超时匹配的标记     */  private  final  boolean   handleTimeout;    ...  }

 

PatternSelectFunction和PatternFlatSelectFunction

当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。

  public     interface     PatternSelectFunction  <IN, OUT>   extends     Function  ,   Serializable   {            /**       * 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识       */      OUT select(  Map  <  String  ,   List  <IN>> pattern)   throws     Exception  ;    }

 

PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。

  public  interface  PatternFlatSelectFunction  <IN, OUT>   extends  Function  ,   Serializable   {      /**     * 生成一个或多个结果     */  void   flatSelect(  Map  <  String  ,   List  <IN>> pattern,   Collector  <OUT> out)   throws  Exception  ;  }

SelectTimeoutCepOperator、PatternTimeoutFunction

SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。

SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。

模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。

还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。

  public  class  SelectTimeoutCepOperator  <IN, OUT1, OUT2, KEY>  extends  AbstractKeyedCEPPatternOperator  <IN, KEY, OUT1,   SelectTimeoutCepOperator  .  SelectWrapper  <IN, OUT1, OUT2>> {  private  OutputTag  <OUT2> timedOutOutputTag;  public  SelectTimeoutCepOperator  (  TypeSerializer  <IN> inputSerializer,  boolean   isProcessingTime,  NFACompiler  .  NFAFactory  <IN> nfaFactory,  final  EventComparator  <IN> comparator,  AfterMatchSkipStrategy   skipStrategy,  // 参数命名混淆了flat...包括SelectWrapper类中的成员命名...  PatternSelectFunction  <IN, OUT1> flatSelectFunction,  PatternTimeoutFunction  <IN, OUT2> flatTimeoutFunction,  OutputTag  <OUT2> outputTag,  OutputTag  <IN> lateDataOutputTag) {  super  (        inputSerializer,        isProcessingTime,        nfaFactory,        comparator,        skipStrategy,  new  SelectWrapper  <>(flatSelectFunction, flatTimeoutFunction),        lateDataOutputTag);  this  .timedOutOutputTag = outputTag;    }    ...  }  public  interface  PatternTimeoutFunction  <IN, OUT>   extends  Function  ,   Serializable   {    OUT timeout(  Map  <  String  ,   List  <IN>> pattern,   long   timeoutTimestamp)   throws  Exception  ;  }  public  interface  PatternFlatTimeoutFunction  <IN, OUT>   extends  Function  ,   Serializable   {  void   timeout(  Map  <  String  ,   List  <IN>> pattern,   long   timeoutTimestamp,   Collector  <OUT> out)   throws  Exception  ;  }

 

CEP和CEPOperatorUtils

CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。

  public  class   CEP {      public  static   <T>   PatternStream  <T> pattern(  DataStream  <T> input,   Pattern  <T, ?> pattern) {  return  new  PatternStream  <>(input, pattern);    }      public  static   <T>   PatternStream  <T> pattern(  DataStream  <T> input,   Pattern  <T, ?> pattern,   EventComparator  <T> comparator) {  return  new  PatternStream  <>(input, pattern, comparator);    }  }

 

CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。

  public  class  CEPOperatorUtils   {    ...  private  static   <IN, OUT, K>   SingleOutputStreamOperator  <OUT> createPatternStream(  final  DataStream  <IN> inputStream,  final  Pattern  <IN, ?> pattern,  final  TypeInformation  <OUT> outTypeInfo,  final  boolean   timeoutHandling,  final  EventComparator  <IN> comparator,  final  OperatorBuilder  <IN, OUT> operatorBuilder) {  final  TypeSerializer  <IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());        // check whether we use processing time  final  boolean   isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() ==   TimeCharacteristic  .  ProcessingTime  ;        // compile our pattern into a NFAFactory to instantiate NFAs later on  final  NFACompiler  .  NFAFactory  <IN> nfaFactory =   NFACompiler  .compileFactory(pattern, timeoutHandling);        final  SingleOutputStreamOperator  <OUT> patternStream;        if   (inputStream   instanceof  KeyedStream  ) {  KeyedStream  <IN, K> keyedStream = (  KeyedStream  <IN, K>) inputStream;        patternStream = keyedStream.transform(          operatorBuilder.getKeyedOperatorName(),          outTypeInfo,          operatorBuilder.build(            inputSerializer,            isProcessingTime,            nfaFactory,            comparator,            pattern.getAfterMatchSkipStrategy()));      }   else   {  KeySelector  <IN,   Byte  > keySelector =   new  NullByteKeySelector  <>();        patternStream = inputStream.keyBy(keySelector).transform(          operatorBuilder.getOperatorName(),          outTypeInfo,          operatorBuilder.build(            inputSerializer,            isProcessingTime,            nfaFactory,            comparator,            pattern.getAfterMatchSkipStrategy()          )).forceNonParallel();      }        return   patternStream;    }    ...  }

FlinkCEP实现步骤

  1. IN: DataSource -> DataStream -> Transformations -> DataStream
  2. Pattern: Pattern.begin.where.next.where...times...
  3. PatternStream: CEP.pattern(DataStream, Pattern)
  4. DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
  5. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP匹配超时实现步骤

TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。

  KeySelector  <IN,   Byte  > keySelector =   new     NullByteKeySelector  <>();

Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。

  1. IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
  2. Pattern: Pattern.begin.where.next.where...within(Time windowTime)
  3. PatternStream: CEP.pattern(KeyedStream, Pattern)
  4. OutputTag: new OutputTag(...)
  5. SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
  6. DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
  7. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP超时不足

和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

FlinkCEP超时完整demo

  public  class  CEPTimeoutEventJob   {  private  static  final  String   LOCAL_KAFKA_BROKER =   "localhost:9092"  ;  private  static  final  String   GROUP_ID =   CEPTimeoutEventJob  .  class  .getSimpleName();  private  static  final  String   GROUP_TOPIC = GROUP_ID;      public  static  void   main(  String  [] args)   throws  Exception   {  // 参数  ParameterTool   params =   ParameterTool  .fromArgs(args);        StreamExecutionEnvironment   env =   StreamExecutionEnvironment  .getExecutionEnvironment();  // 使用事件时间      env.setStreamTimeCharacteristic(  TimeCharacteristic  .  EventTime  );      env.enableCheckpointing(  5000  );      env.getCheckpointConfig().enableExternalizedCheckpoints(  CheckpointConfig  .  ExternalizedCheckpointCleanup  .RETAIN_ON_CANCELLATION);      env.getConfig().disableSysoutLogging();      env.getConfig().setRestartStrategy(  RestartStrategies  .fixedDelayRestart(  5  ,   10000  ));        // 不使用POJO的时间  final  AssignerWithPeriodicWatermarks   extractor =   new  IngestionTimeExtractor  <POJO>();        // 与Kafka Topic的Partition保持一致      env.setParallelism(  3  );        Properties   kafkaProps =   new  Properties  ();      kafkaProps.setProperty(  "bootstrap.servers"  , LOCAL_KAFKA_BROKER);      kafkaProps.setProperty(  "group.id"  , GROUP_ID);        // 接入Kafka的消息  FlinkKafkaConsumer011  <POJO> consumer =   new  FlinkKafkaConsumer011  <>(GROUP_TOPIC,   new  POJOSchema  (), kafkaProps);  DataStream  <POJO> pojoDataStream = env.addSource(consumer)          .assignTimestampsAndWatermarks(extractor);      pojoDataStream.print();        // 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】  // 1.  DataStream  <POJO> keyedPojos = pojoDataStream          .keyBy(  "aid"  );        // 从初始化到终态-一个完整的POJO事件序列  // 2.  Pattern  <POJO, POJO> completedPojo =  Pattern  .<POJO>begin(  "init"  )              .where(  new  SimpleCondition  <POJO>() {  private  static  final  long   serialVersionUID = -  6847788055093903603L  ;                  @Override  public  boolean   filter(POJO pojo)   throws  Exception   {  return  "02"  .equals(pojo.getAstatus());                }              })              .followedBy(  "end"  )  //            .next("end")              .where(  new  SimpleCondition  <POJO>() {  private  static  final  long   serialVersionUID = -  2655089736460847552L  ;                  @Override  public  boolean   filter(POJO pojo)   throws  Exception   {  return  "00"  .equals(pojo.getAstatus()) ||   "01"  .equals(pojo.getAstatus());                }              });        // 找出1分钟内【便于测试】都没有到终态的事件aid  // 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream  // 3.  PatternStream  <POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within(  Time  .minutes(  1  )));        // 定义侧面输出timedout  // 4.  OutputTag  <POJO> timedout =   new  OutputTag  <POJO>(  "timedout"  ) {  private  static  final  long   serialVersionUID =   773503794597666247L  ;      };        // OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction  // 5.  SingleOutputStreamOperator  <POJO> timeoutPojos = patternStream.flatSelect(          timedout,  new  POJOTimedOut  (),  new  FlatSelectNothing  ()      );        // 打印输出超时的POJO  // 6.7.      timeoutPojos.getSideOutput(timedout).print();      timeoutPojos.print();      env.execute(  CEPTimeoutEventJob  .  class  .getSimpleName());    }      /**     * 把超时的事件收集起来     */  public  static  class  POJOTimedOut  implements  PatternFlatTimeoutFunction  <POJO, POJO> {  private  static  final  long   serialVersionUID = -  4214641891396057732L  ;        @Override  public  void   timeout(  Map  <  String  ,   List  <POJO>> map,   long   l,   Collector  <POJO> collector)   throws  Exception   {  if   (  null   != map.get(  "init"  )) {  for   (POJO pojoInit : map.get(  "init"  )) {  System  .out.println(  "timeout init:"   + pojoInit.getAid());            collector.collect(pojoInit);          }        }  // 因为end超时了,还没收到end,所以这里是拿不到end的  System  .out.println(  "timeout end: "   + map.get(  "end"  ));      }    }      /**     * 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了     * 一分钟时间内走完init和end的数据     *     * @param <T>     */  public  static  class  FlatSelectNothing  <T>   implements  PatternFlatSelectFunction  <T, T> {  private  static  final  long   serialVersionUID = -  3029589950677623844L  ;        @Override  public  void   flatSelect(  Map  <  String  ,   List  <T>> pattern,   Collector  <T> collector) {  System  .out.println(  "flatSelect: "   + pattern);      }    }  }

测试结果(followedBy):

  3  > POJO{aid=  'ID000-0'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563419728242  , energy=  529.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '02'  , createTime=  null  , updateTime=  null  }  3  > POJO{aid=  'ID000-1'  , astyle=  'STYLE000-2'  , aname=  'NAME-1'  , logTime=  1563419728783  , energy=  348.00  , age=  26  , tt=  2019  -  07  -  18  , astatus=  '02'  , createTime=  null  , updateTime=  null  }  3  > POJO{aid=  'ID000-0'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563419749259  , energy=  492.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '00'  , createTime=  null  , updateTime=  null  }  flatSelect: {init=[POJO{aid=  'ID000-0'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563419728242  , energy=  529.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '02'  , createTime=  null  , updateTime=  null  }],   end  =[POJO{aid=  'ID000-0'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563419749259  , energy=  492.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '00'  , createTime=  null  , updateTime=  null  }]}  timeout init:ID000-  1  3  > POJO{aid=  'ID000-1'  , astyle=  'STYLE000-2'  , aname=  'NAME-1'  , logTime=  1563419728783  , energy=  348.00  , age=  26  , tt=  2019  -  07  -  18  , astatus=  '02'  , createTime=  null  , updateTime=  null  }  timeout   end  :   null  3  > POJO{aid=  'ID000-2'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563419829639  , energy=  467.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '03'  , createTime=  null  , updateTime=  null  }  3  > POJO{aid=  'ID000-2'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563419841394  , energy=  107.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '00'  , createTime=  null  , updateTime=  null  }  3  > POJO{aid=  'ID000-3'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563419967721  , energy=  431.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '02'  , createTime=  null  , updateTime=  null  }  3  > POJO{aid=  'ID000-3'  , astyle=  'STYLE000-2'  , aname=  'NAME-0'  , logTime=  1563419979567  , energy=  32.00  , age=  26  , tt=  2019  -  07  -  18  , astatus=  '03'  , createTime=  null  , updateTime=  null  }  3  > POJO{aid=  'ID000-3'  , astyle=  'STYLE000-2'  , aname=  'NAME-0'  , logTime=  1563419993612  , energy=  542.00  , age=  26  , tt=  2019  -  07  -  18  , astatus=  '01'  , createTime=  null  , updateTime=  null  }  flatSelect: {init=[POJO{aid=  'ID000-3'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563419967721  , energy=  431.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '02'  , createTime=  null  , updateTime=  null  }],   end  =[POJO{aid=  'ID000-3'  , astyle=  'STYLE000-2'  , aname=  'NAME-0'  , logTime=  1563419993612  , energy=  542.00  , age=  26  , tt=  2019  -  07  -  18  , astatus=  '01'  , createTime=  null  , updateTime=  null  }]}  3  > POJO{aid=  'ID000-4'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563420063760  , energy=  122.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '02'  , createTime=  null  , updateTime=  null  }  3  > POJO{aid=  'ID000-4'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563420078008  , energy=  275.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '03'  , createTime=  null  , updateTime=  null  }  timeout init:ID000-  4  3  > POJO{aid=  'ID000-4'  , astyle=  'STYLE000-0'  , aname=  'NAME-0'  , logTime=  1563420063760  , energy=  122.00  , age=  0  , tt=  2019  -  07  -  18  , astatus=  '02'  , createTime=  null  , updateTime=  null  }  timeout   end  :   null

总结

以上所述是小编给大家介绍的Apache FlinkCEP 实现超时状态监控的步骤,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

参与评论