Hi Sridhar, From looking at your code:
1) The “KafkaDataSource” is a custom source that you implemented? Does this source buffer anything? 2) The getStreamSource2 seems to return again a "new KafkaDataSource<MyMessage1>”. Can this be a problem? 3) You are working on processing time and you are simply detecting if 2 messages of the same type came within 15min right? I suppose that this could also be implemented using the times() quantifier, but this is just a matter of taste. Could you reduce this to a smaller duration and see if you still get a corrupted stream exception? Thanks, Kostas > On Sep 27, 2017, at 5:42 AM, Sridhar Chellappa <flinken...@gmail.com> wrote: > > One more point to add. > > I disabled checkpoints (by commenting out code that calls > enableCheckpointing()) and re-ran the job this time with plenty of memory to > the job manager > > ~/flink-1.3.2/bin/yarn-session.sh -n 4 -jm 24576 -tm 24576 -s 2 -d > > At the Jobmanager, I am still hitting: > > 2017-09-25 06:46:44,066 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - Starting > YARN ApplicationMaster / ResourceManager / JobManager (Version: 1.3.2, > Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC) > 2017-09-25 06:46:44,066 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - Current > user: flink > 2017-09-25 06:46:44,066 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - JVM: OpenJDK > 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 > 2017-09-25 06:46:44,066 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - Maximum heap > size: 16384 MiBytes > 2017-09-25 06:46:44,066 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - JAVA_HOME: > /usr/lib/jvm/java-8-openjdk-amd64 > 2017-09-25 06:46:44,067 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - Hadoop > version: 2.7.2 > 2017-09-25 06:46:44,067 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - JVM Options: > 2017-09-25 06:46:44,067 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - -Xmx18432m > 2017-09-25 06:46:44,067 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - > -Dlog.file=/var/log/hadoop-yarn/userlogs/application_1506317793012_0001/container_1506317793012_0001_01_000001/jobmanager.log > 2017-09-25 06:46:44,067 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - > -Dlogback.configurationFile=file:logback.xml > 2017-09-25 06:46:44,067 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - > -Dlog4j.configuration=file:log4j.properties > 2017-09-25 06:46:44,067 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner - Program > Arguments: (none) > > > . > > . > > 2017-09-25 06:50:51,925 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map > -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) > switched from DEPLOYING to RUNNING. > 2017-09-25 13:38:54,175 INFO org.apache.flink.runtime.blob.BlobCache > - Created BLOB cache storage directory > /tmp/blobStore-3e0b96a1-904b-4acb-b0d3-9d88f2073e97 > 2017-09-25 13:38:54,187 INFO org.apache.flink.runtime.blob.BlobCache > - Downloading 49efe0ad58b727ba145b86df6088111c9a90ddd6 from > localhost/127.0.0.1:55550 <http://127.0.0.1:55550/> > 2017-09-25 16:30:39,974 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > KeyedCEPPatternOperator -> Map (2/2) (e464ec796cd239a7b7fa225aaf86309a) > switched from RUNNING to CANCELED. > 2017-09-25 16:30:39,975 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map > -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) > switched from RUNNING to FAILED. > java.lang.OutOfMemoryError: GC overhead limit exceeded > at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown > Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160) > at > com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > > > On Wed, Sep 27, 2017 at 8:34 AM, Sridhar Chellappa <flinken...@gmail.com > <mailto:flinken...@gmail.com>> wrote: > Here is the snippet : > > public interface Rule { > DataStream<Alert> run(); > } > > public class Rule1 implements Rule { > > private static final String RULE_ID = "Rule1" > > @Override > public DataStream<Alert> run() { > > > Pattern<MyMessage1, ?> MyMessage1Pattern = > Pattern.<MyMessage1>begin("first"). > subtype(MyMessage1.class). > next("second"). > subtype(MyMessage1.class). > within(Time.minutes(15); > > PatternStream<MyMessage1> MyMessage1PatternStream = > CEP.pattern( > MyMessage1DataStream.keyBy("field1", "field2"), > MyMessage1Pattern > ); > > return (MyMessage1PatternStream.select( > new PatternSelectFunction<MyMessage1, Alert>() { > @Override > public Alert select(Map<String, List<MyMessage1>> > pattern) throws Exception { > > String alertMessage = String.format("Cep Alert. Rule > ID : %s" RULE_ID); > > return new CEPAlert(alertMessage); > } > } > ) > ); > > } > > > > private static List<Rule> getStream1RulesToExecute(DataStream<MyMessage1> > MyMessage1DataStream) { > List<Rule> rules = new ArrayList<Rule>(); > > rules.add(new Rule1(MyMessage1DataStream)); > > return rules; > } > > > private static List<Rule> getStream2RulesToExecute(DataStream<MyMessage1> > MyMessage1DataStream) { > List<Rule> rules = new ArrayList<Rule>(); > > rules.add(new Rule2(MyMessage1DataStream)); > return rules; > } > > > public RichParallelSourceFunction<MyMessage1> > getStreamSource1(StreamExecutionEnvironment env, ParameterTool parameterTool) > { > > > env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, > DEFAULT_CHECKPOINT_INTERVAL)); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS); > > env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > > KafkaDataSource<T> flinkCepConsumer = > new KafkaDataSource<MyMessage1>(parameterTool, new > MyMessage1SerDeSchema()); > > return flinkCepConsumer; > } > > > public RichParallelSourceFunction<MyMessage2> > getStreamSource2(StreamExecutionEnvironment env, ParameterTool parameterTool) > { > > > env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, > DEFAULT_CHECKPOINT_INTERVAL)); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS); > > env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > > KafkaDataSource<T> flinkCepConsumer = > new KafkaDataSource<MyMessage1>(parameterTool, new > MyMessage2SerDeSchema()); > > return flinkCepConsumer; > } > > > public static void main(String[] args) throws Exception { > ParameterTool parameterTool = > ParameterTool.fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]); > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.getConfig().setGlobalJobParameters(parameterTool); > > DataStream<MyMessage1> message1Stream = env.addSource( > getStreamSource1(env, parameterTool); > ); > > > DataStream<MyMessge2> message2Stream = env.addSource( > getStreamSource2(env, parameterTool); > ); > > > getStream1RulesToExecute(message1Stream).forEach(rule -> > rule.run().print()); > getStream2RulesToExecute(message2tream).forEach(rule -> > rule.run().print()); > env.execute(STREAMING_JOB_NAME); > } > > > > > > > On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org > <mailto:tzuli...@apache.org>> wrote: > I talked a bit with Kostas on what may be happening here. > > It could be that your patterns are not closing, which depends on the pattern > construction of your CEP job. > Could you perhaps provide an overview / code snippet of what your CEP job is > doing? > > Looping Kostas (in CC) also to this thread as he may have a better idea what > is happening here. > > Cheers, > Gordon > > On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa (flinken...@gmail.com > <mailto:flinken...@gmail.com>) wrote: > >> Thanks for the reply. Well, tracing back to the root cause, I see the >> following: >> >> 1. At the Job manager, the Checkpoint times are getting worse : >> >> Jobmanager : >> >> Checkpoint times are getting worse progressively. >> >> 2017-09-16 05:05:50,813 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 1 @ 1505538350809 >> 2017-09-16 05:05:51,396 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 1 (11101233 bytes in 586 ms). >> 2017-09-16 05:07:30,809 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 2 @ 1505538450809 >> 2017-09-16 05:07:31,657 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 2 (18070955 bytes in 583 ms). >> >> . >> . >> . >> . >> . >> . >> . >> . >> . >> . >> . >> . >> . >> 2017-09-16 07:32:58,117 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 89 (246125113 bytes in 27194 ms). >> 2017-09-16 07:34:10,809 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 90 @ 1505547250809 >> 2017-09-16 07:34:44,932 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 90 (248272325 bytes in 34012 ms). >> 2017-09-16 07:35:50,809 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 91 @ 1505547350809 >> 2017-09-16 07:36:37,058 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 91 (250348812 bytes in 46136 ms). >> 2017-09-16 07:37:30,809 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 92 @ 1505547450809 >> 2017-09-16 07:38:18,076 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 92 (252399724 bytes in 47152 ms). >> 2017-09-16 07:39:10,809 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 93 @ 1505547550809 >> 2017-09-16 07:40:13,494 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 93 (254374636 bytes in 62573 ms). >> 2017-09-16 07:40:50,809 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 94 @ 1505547650809 >> 2017-09-16 07:42:42,850 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 94 (256386533 bytes in 111898 ms). >> 2017-09-16 07:42:42,850 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 95 @ 1505547762850 >> 2017-09-16 07:46:06,241 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 95 (258441766 bytes in 203268 ms). >> 2017-09-16 07:46:06,241 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 96 @ 1505547966241 >> 2017-09-16 07:48:42,069 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) >> switched from RUNNING to FAILED. >> AsynchronousException{java.lang.Exception: Could not materialize checkpoint >> 96 for operator KeyedCEPPatternOperator -> Map (1/4).} >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.Exception: Could not materialize checkpoint 96 for >> operator KeyedCEPPatternOperator -> Map (1/4). >> ... 6 more >> Caused by: java.util.concurrent.ExecutionException: >> java.lang.OutOfMemoryError: GC overhead limit exceeded >> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >> at >> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) >> ... 5 more >> >> >> So, it looks like the Job Manager ran out of memory, thanks to the >> "Progressively Getting Worse" checkpoints. Any ideas on how to make sure the >> checkpoints faster? >> >> >> >> >> >> >> On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org >> <mailto:tzuli...@apache.org>> wrote: >> Hi Sridhar, >> >> Sorry that this didn't get a response earlier. >> >> According to the trace, it seems like the job failed during the process, and >> when trying to automatically restore from a checkpoint, deserialization of a >> CEP `IterativeCondition` object failed. As far as I can tell, CEP operators >> are just using Java serialization on CEP `IterativeCondition` objects, so >> should not be related to the protobuf serializer that you are using. >> >> Is this still constantly happening for you? >> >> Cheers, >> Gordon >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >> > >