[jira] [Commented] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing
[ https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401919#comment-16401919 ] Dawid Wysakowicz commented on BEAM-3494: [~suganyap] Could you elaborate a bit more how you enable checkpointing? Do I understand correctly that you pass {{checkpointingInterval}} as cli parameter and {{state.backend}} is set in flink-conf.yaml? Also could you tell more what does it mean that the aggregated data is not restored? Best with some simple example? > Snapshot state of aggregated data of apache beam project is not maintained in > flink's checkpointing > > > Key: BEAM-3494 > URL: https://issues.apache.org/jira/browse/BEAM-3494 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: suganya >Priority: Major > > We have a beam project which consumes events from kafka,does a groupby in a > time window(5 mins),after window elapses it pushes the events to downstream > for merge.This project is deployed using flink ,we have enabled checkpointing > to recover from failed state. > (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem) > Offsets from kafka get checkpointed every 5 > mins(checkpointingInterval).Before finishing the entire DAG(groupBy and > merge) , events offsets are getting checkpointed.So incase of any restart > from task-manager ,new task gets started from last successful checkpoint ,but > we could'nt able to get the aggregated snapshot data(data from groupBy task) > from the persisted checkpoint. > Able to retrieve the last successful checkpointed offset from kafka ,but > couldnt able to get last aggregated data till checkpointing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing
[ https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341057#comment-16341057 ] Aljoscha Krettek commented on BEAM-3494: How are you enabling checkpointing? Also, could you please reformat your posting to put the code between code tags, like this {code:java} {code} your code ... {code}{code} to make it more readable? > Snapshot state of aggregated data of apache beam project is not maintained in > flink's checkpointing > > > Key: BEAM-3494 > URL: https://issues.apache.org/jira/browse/BEAM-3494 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: suganya >Priority: Major > > We have a beam project which consumes events from kafka,does a groupby in a > time window(5 mins),after window elapses it pushes the events to downstream > for merge.This project is deployed using flink ,we have enabled checkpointing > to recover from failed state. > (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem) > Offsets from kafka get checkpointed every 5 > mins(checkpointingInterval).Before finishing the entire DAG(groupBy and > merge) , events offsets are getting checkpointed.So incase of any restart > from task-manager ,new task gets started from last successful checkpoint ,but > we could'nt able to get the aggregated snapshot data(data from groupBy task) > from the persisted checkpoint. > Able to retrieve the last successful checkpointed offset from kafka ,but > couldnt able to get last aggregated data till checkpointing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing
[ https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330077#comment-16330077 ] suganya commented on BEAM-3494: --- he following code is using apache beam libraries to create a pipeline.Please find the code. public void run(String[] args) { BeamCLIOptions beamCliOptions = PipelineOptionsFactory.fromArgs(args).withValidation() .as(BeamCLIOptions.class); Pipeline pipeline = Pipeline.create(beamCliOptions); MergeDistribution mergeDistribution = MergeDistribution .valueOf(beamCliOptions.getMergeDistribution()); MergeDistribution fixedWindowDuration = MergeDistribution .valueOf(beamCliOptions.getFixedWindowSize()); KafkaIO.Read kafkaEntityStreamReader = KafkaIO.read() .withBootstrapServers(beamCliOptions.getKafkaServers()) .withTopic(beamCliOptions.getKafkaTopic()) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "latest","enable.auto.commit","true")); pipeline.apply(kafkaEntityStreamReader.withoutMetadata()) .apply(Values.create()) .apply(Window.into( FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins( .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins() .discardingFiredPanes() .withAllowedLateness(Duration.ZERO)) .apply(ParDo.of(new ExtractDataFn( beamCliOptions.getDatePartitionKey(), new DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis( .apply("Applying GroupByKey on -MM-DD HH ", GroupByKey.create()) .apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions))); pipeline.run(); } > Snapshot state of aggregated data of apache beam project is not maintained in > flink's checkpointing > > > Key: BEAM-3494 > URL: https://issues.apache.org/jira/browse/BEAM-3494 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: suganya >Assignee: Kenneth Knowles >Priority: Major > > We have a beam project which consumes events from kafka,does a groupby in a > time window(5 mins),after window elapses it pushes the events to downstream > for merge.This project is deployed using flink ,we have enabled checkpointing > to recover from failed state. > (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem) > Offsets from kafka get checkpointed every 5 > mins(checkpointingInterval).Before finishing the entire DAG(groupBy and > merge) , events offsets are getting checkpointed.So incase of any restart > from task-manager ,new task gets started from last successful checkpoint ,but > we could'nt able to get the aggregated snapshot data(data from groupBy task) > from the persisted checkpoint. > Able to retrieve the last successful checkpointed offset from kafka ,but > couldnt able to get last aggregated data till checkpointing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)