On Wed, Apr 7, 2021 at 11:55 AM Kenneth Knowles <k...@apache.org> wrote:

> [I think this has graduated to a +dev <d...@beam.apache.org> thread]
>
> Yea, in Beam it is left up to the IOs primarily, hence the bundle
> finalization step, or allowed runners to have their own features of course.
> Dataflow also does have in-place pipeline update that restores the
> persisted checkpoints from one pipeline to another - same basic
> mechanism/idea as Spark Structured Streaming but different overall
> workflow. +Reuven Lax <re...@google.com> has put a lot of thought into
> updating, checkpointing, resuming, etc. Runners differ a lot in these
> areas. Is there something that should graduate from runner-specific to the
> Beam model?
>
> Kenn
>

In some ways, having an external checkpoint mechanism makes programming IOs
simpler.  Let's use Redis Streams as an example, as our company recently
implemented a RedisStreamIO internally so the details are fresh.

One requirement was the need to be able to shut down a streaming Beam
Pipeline, and then restart it from a later point in time without lost data
and without starting from the beginning of time.

This meant that I need to ensure only elements that are finished processing
in a bundle are committed as 'processed' back to the redis server, which I
accomplished by keeping track of all the elements that are outputted, then
on finalizeCheckpoint, which I *assume* happens at the end of a bundle but
I'm fuzzy on details, send those element IDs back to the server as consumed
(with Redis XACK).

If instead Beam would let you persist checkpoints externally and allow a
pipeline to bootstrap off of the already existing checkpoint, I simply have
to keep track *in the checkpoint* of the last element ID read, and can use
that as the starting offset.  I would then be able to 'eager ack' read
messages and not worry about delaying commits until elements are outputted
further down the pipeline etc, since if an element is read into a
checkpoint, we know it is recoverable.

This also makes life a lot easier for anything regarding Kinesis since the
Kinesis servers don't have a way of managing offsets/last element read
(from when I used it?, maybe changed), unlike Kafka, Pubsub, Redis Streams,
etc.

Hopefully this makes sense, and if I have some misunderstandings I'd love
to learn more.  This general subject has come up a few times in the beam
slack so I think at the very least some extra documentation on these types
of use cases might be welcome.




>
> On Wed, Apr 7, 2021 at 11:28 AM Vincent Marquez <vincent.marq...@gmail.com>
> wrote:
>
>> Looks like this is a common source of confusion, I had similar questions
>> about checkpointing in the beam slack.
>>
>> In Spark Structured Streaming, checkpoints are saved to an *external*
>> HDFS location and persist *beyond* each run, so in the event of a stream
>> crashing, you can just point your next execution of the stream to the
>> checkpoint location.  Kafka  (or Kinesis/Redis Stream etc) offsets are
>> persisted in the checkpoint, so the stream would resume off of the last
>> committed checkpoint location.
>>
>> It doesn't seem Beam has an external checkpoint that persists beyond a
>> single stream execution, so in Beam with Kinesis I believe you'll have to
>> manage your own offsets deliberately with an external source if you want to
>> achieve 'exactly once' semantics in the event of shutting down a stream and
>>  resuming it at a later point.
>>
>> In Kafka you don't need this since as long as we ensure our offsets are
>> committed in finalization of a bundle, the offsets for a particular group
>> id are stored on the server.
>>
>>
>> On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> This sounds similar to the "Kafka Commit" in
>>> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
>>> <boyu...@google.com> and also to how PubsubIO ACKs messages in the
>>> finalizer. I don't know much about KinesisIO or how Kinesis works. I was
>>> just asking to clarify, in case other folks know more, like +Alexey
>>> Romanenko <aromanenko....@gmail.com> and +Ismaël Mejía
>>> <ieme...@gmail.com> have modified KinesisIO. If the feature does not
>>> exist today, perhaps we can identify the best practices around this pattern.
>>>
>>> Kenn
>>>
>>> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey <adude3...@gmail.com>
>>> wrote:
>>>
>>>> Hi Kenn,
>>>>
>>>> yes, resuming reading at the proper timestamp is exactly the issue we
>>>> are currently struggling with. E.g. with Kinesis Client Lib we could store
>>>> the last read within some dynamo table. This mechanism is not used with
>>>> beam, as we understand, the runner is responsible to track that checkpoint
>>>> mark.
>>>>
>>>> Now, obviously on restarting the pipeline, e.g. on non compatible
>>>> upgrade, that is, an pipeline update is just not feasible, there must be
>>>> some mechanism in place on how Dataflow will know where to continue. Is
>>>> that simply the pipeline name? Or is there more involved? So how does
>>>> checkpointing actually work here?
>>>>
>>>> Based on 'name', wouldn't that imply that something like (example taken
>>>> from
>>>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
>>>> )
>>>>
>>>>   export REGION="us-central1"
>>>>
>>>>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
>>>> +%Y%m%d-%H%M%S`" \
>>>>     --template-file-gcs-location "$TEMPLATE_PATH" \
>>>>     --parameters inputSubscription="$SUBSCRIPTION" \
>>>>     --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
>>>>     --region "$REGION"
>>>>
>>>> will not resume on last read on rerun, because the name obviously
>>>> changes here?
>>>>
>>>> best,
>>>>
>>>> michel
>>>>
>>>>
>>>>
>>>> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> I would assume the main issue is resuming reading from the Kinesis
>>>>> stream from the last read? In the case for Pubsub (just as another example
>>>>> of the idea) this is part of the internal state of a pre-created
>>>>> subscription.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey <adude3...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi list,
>>>>>>
>>>>>> with our current project we are implementing our streaming pipeline
>>>>>> based on Google Dataflow.
>>>>>>
>>>>>> Essentially we receive input via Kinesis, doing some filtering,
>>>>>> enrichment and sessionizing and output to PubSub and/or google storage.
>>>>>>
>>>>>> After short investigations it is not clear to us, how checkpointing
>>>>>> will work running on Dataflow in connection with KinesisIO. Is there any
>>>>>> documentation/discussions to get a better understanding on how that will 
>>>>>> be
>>>>>> working? Especially if we are forced to restart our pipelines, how could 
>>>>>> we
>>>>>> ensure not to loose any events?
>>>>>>
>>>>>> As far as I understand currently, it should work 'auto-magically' but
>>>>>> it is not yet clear to us, how it will actually behave. Before we try to
>>>>>> start testing our expectations or even try to implement some
>>>>>> watermark-tracking by ourself we hoped to get some insights from other
>>>>>> users here.
>>>>>>
>>>>>> Any help appreciated.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> michel
>>>>>>
>>>>>
>>
>> ~Vincent
>>
>

Reply via email to