[jira] [Commented] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing

2018-03-16 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401919#comment-16401919
 ] 

Dawid Wysakowicz commented on BEAM-3494:


[~suganyap] Could you elaborate a bit more how you enable checkpointing? Do I 
understand correctly that you pass {{checkpointingInterval}} as cli parameter 
and {{state.backend}} is set in flink-conf.yaml? 

Also could you tell more what does it mean that the aggregated data is not 
restored? Best with some simple example?

> Snapshot state of aggregated data of apache beam project is not maintained in 
> flink's checkpointing 
> 
>
> Key: BEAM-3494
> URL: https://issues.apache.org/jira/browse/BEAM-3494
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: suganya
>Priority: Major
>
> We have a beam project which consumes events from kafka,does a groupby in a 
> time window(5 mins),after window elapses it pushes the events to downstream 
> for merge.This project is deployed using flink ,we have enabled checkpointing 
> to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing

2018-01-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341057#comment-16341057
 ] 

Aljoscha Krettek commented on BEAM-3494:


How are you enabling checkpointing? Also, could you please reformat your 
posting to put the code between code tags, like this
{code:java}
{code}
your code ...
{code}{code}
to make it more readable?

> Snapshot state of aggregated data of apache beam project is not maintained in 
> flink's checkpointing 
> 
>
> Key: BEAM-3494
> URL: https://issues.apache.org/jira/browse/BEAM-3494
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: suganya
>Priority: Major
>
> We have a beam project which consumes events from kafka,does a groupby in a 
> time window(5 mins),after window elapses it pushes the events to downstream 
> for merge.This project is deployed using flink ,we have enabled checkpointing 
> to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3494) Snapshot state of aggregated data of apache beam project is not maintained in flink's checkpointing

2018-01-17 Thread suganya (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330077#comment-16330077
 ] 

suganya commented on BEAM-3494:
---

he following code is using apache beam libraries to create a pipeline.Please 
find the code.

public void run(String[] args)

{ BeamCLIOptions beamCliOptions = 
PipelineOptionsFactory.fromArgs(args).withValidation() 
.as(BeamCLIOptions.class); Pipeline pipeline = Pipeline.create(beamCliOptions); 
MergeDistribution mergeDistribution = MergeDistribution 
.valueOf(beamCliOptions.getMergeDistribution()); MergeDistribution 
fixedWindowDuration = MergeDistribution 
.valueOf(beamCliOptions.getFixedWindowSize()); KafkaIO.Read 
kafkaEntityStreamReader = KafkaIO.read() 
.withBootstrapServers(beamCliOptions.getKafkaServers()) 
.withTopic(beamCliOptions.getKafkaTopic()) 
.withKeyDeserializer(StringDeserializer.class) 
.withValueDeserializer(StringDeserializer.class) 
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
"latest","enable.auto.commit","true")); 
pipeline.apply(kafkaEntityStreamReader.withoutMetadata()) 
.apply(Values.create()) .apply(Window.into( 
FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins( 
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() 
.plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins() 
.discardingFiredPanes() .withAllowedLateness(Duration.ZERO)) 
.apply(ParDo.of(new ExtractDataFn( beamCliOptions.getDatePartitionKey(), new 
DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis( 
.apply("Applying GroupByKey on -MM-DD HH ", GroupByKey.create()) 
.apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions))); 
pipeline.run(); }

> Snapshot state of aggregated data of apache beam project is not maintained in 
> flink's checkpointing 
> 
>
> Key: BEAM-3494
> URL: https://issues.apache.org/jira/browse/BEAM-3494
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: suganya
>Assignee: Kenneth Knowles
>Priority: Major
>
> We have a beam project which consumes events from kafka,does a groupby in a 
> time window(5 mins),after window elapses it pushes the events to downstream 
> for merge.This project is deployed using flink ,we have enabled checkpointing 
> to recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)