On Tue, Mar 21, 2017 at 11:59 PM Mingmin Xu <[email protected]> wrote:
> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory. > Can it restore during job restart? --Not test the runner in streaming for > some time. > TmpCheckpointDirFactory simply points to a "default" directory. Spark runner, just like Spark, requires you specify a path to a resilient FS (gs, s3, hdfs) to save the checkpoint. Starting the same application with an existing checkpoint would pick-up where you left. If you're using a "non-resilient" FS (such as your local FS) both Spark and the Spark runner would WARN - it's OK to do this, but you should take into account that if the checkpoint is lost, so is your ability to resume. > > Regarding to data-completeness, I would use at-most-once when few data > missing(mostly tasknode failure) is tolerated, compared to the performance > cost introduced by 'state'/'checkpoint'. > > On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela <[email protected]> wrote: > > > On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu <[email protected]> wrote: > > > > > Move discuss to dev-list > > > > > > Savepoint in Flink, also checkpoint in Spark, should be good enough to > > > handle this case. > > > > > > When people don't enable these features, for example only need > > at-most-once > > > > > The Spark runner forces checkpointing on any streaming (Beam) > application, > > mostly because it uses mapWithState for reading from UnboundedSource and > > updateStateByKey form GroupByKey - so by design, Spark runner is > > at-least-once. Generally, I always thought that applications that require > > at-most-once are more focused on processing time only, as they only care > > about whatever get's ingested into the pipeline at a specific time and > > don't care (up to the point of losing data) about correctness. > > I would be happy to hear more about your use case. > > > > > semantic, each unbounded IO should try its best to restore from last > > > offset, although CheckpointMark is null. Any ideas? > > > > > > Mingmin > > > > > > On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin <[email protected]> > > wrote: > > > > > > > hey, > > > > > > > > The native Beam UnboundedSource API supports resuming from checkpoint > > -- > > > > that specifically happens here > > > > < > > > https://github.com/apache/beam/blob/master/sdks/java/io/kafk > > a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674> > > > when > > > > the KafkaCheckpointMark is non-null. > > > > > > > > The FlinkRunner should be providing the KafkaCheckpointMark from the > > most > > > > recent savepoint upon restore. > > > > > > > > There shouldn't be any "special" Flink runner support needed, nor is > > the > > > > State API involved. > > > > > > > > Dan > > > > > > > > On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré < > [email protected] > > > > > > > wrote: > > > > > > > >> Would not it be Flink runner specific ? > > > >> > > > >> Maybe the State API could do the same in a runner agnostic way (just > > > >> thinking loud) ? > > > >> > > > >> Regards > > > >> JB > > > >> > > > >> On 03/21/2017 04:56 PM, Mingmin Xu wrote: > > > >> > > > >>> From KafkaIO itself, looks like it either start_from_beginning or > > > >>> start_from_latest. It's designed to leverage > > > >>> `UnboundedSource.CheckpointMark` > > > >>> during initialization, but so far I don't see it's provided by > > runners. > > > >>> At the > > > >>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775 > > > >>> <https://issues.apache.org/jira/browse/BEAM-1775>) to handle it > in > > > >>> KafkaIO. > > > >>> > > > >>> Mingmin > > > >>> > > > >>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek < > > [email protected] > > > >>> <mailto:[email protected]>> wrote: > > > >>> > > > >>> Hi, > > > >>> Are you using Flink savepoints [1] when restoring your > > application? > > > >>> If you > > > >>> use this the Kafka offset should be stored in state and it > should > > > >>> restart > > > >>> from the correct position. > > > >>> > > > >>> Best, > > > >>> Aljoscha > > > >>> > > > >>> [1] > > > >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > > > >>> setup/savepoints.html > > > >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3 > > > >>> /setup/savepoints.html> > > > >>> > On 21 Mar 2017, at 01:50, Jins George <[email protected] > > > >>> <mailto:[email protected]>> wrote: > > > >>> > > > > >>> > Hello, > > > >>> > > > > >>> > I am writing a Beam pipeline(streaming) with Flink runner to > > > >>> consume data > > > >>> from Kafka and apply some transformations and persist to Hbase. > > > >>> > > > > >>> > If I restart the application ( due to failure/manual > restart), > > > >>> consumer > > > >>> does not resume from the offset where it was prior to restart. > It > > > >>> always > > > >>> resume from the latest offset. > > > >>> > > > > >>> > If I enable Flink checkpionting with hdfs state back-end, > > system > > > >>> appears > > > >>> to be resuming from the earliest offset > > > >>> > > > > >>> > Is there a recommended way to resume from the offset where it > > was > > > >>> stopped ? > > > >>> > > > > >>> > Thanks, > > > >>> > Jins George > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> -- > > > >>> ---- > > > >>> Mingmin > > > >>> > > > >> > > > >> -- > > > >> Jean-Baptiste Onofré > > > >> [email protected] > > > >> http://blog.nanthrax.net > > > >> Talend - http://www.talend.com > > > >> > > > > > > > > > > > > > > > > > -- > > > ---- > > > Mingmin > > > > > > > > > -- > ---- > Mingmin >
