Hi Vincent,

1. Just to be clear, for a streaming pipeline (say, on the dataflow
> runner)it  will use the 'residual' result of the SplitRestriction
> (retrieved from trySplit) as the checkpoint, so if the pipeline is stopped
> due to an error, then restarted with the same checkpoint it would resume
> off from the last written Residual checkpoint position?


Checkpoint is *only* persisted while the pipeline is running. So when the
pipeline is stopped and restarted, there is no way for the pipeline to
restart from the last checkpoint that is produced by the trySplit.
Alternatively, you can use bundle finalization to commit what you have
read, or create a CommitTransform. For example, in Kafka read, we create a
commit transform[1] which commits the read offset every 5 mins. When the
pipeline is stopped and restarted, the Kafka read will read from the last
committed offset.


> 2. Another question I have for writing an unbounded Splittable DoFn is how
> to write a test. With UnboundedSource, there was a way to wrap it to make
> it bounded so it would stop after processing N number of elements.  I would
> like to do the same, is there a uniform way of doing this currently?  If
> not I will just write a something custom for the loop in the processElement
> to check if it should continue opposed to 'while (true)'. Let me know if
> there is a uniform way, or you have a better idea on how to write a test
> for my PTransform that uses a Splittable DoFn.
>

 What I do for Kafka test[2][3] is to start a streaming pipeline and wait
for a certain time to cancel it.

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java#L137
[3]
https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy

On Fri, Dec 4, 2020 at 11:57 AM Vincent Marquez <[email protected]>
wrote:

> Thank you for your help Boyuan.
>
> 1. Just to be clear, for a streaming pipeline (say, on the dataflow
> runner)it  will use the 'residual' result of the SplitRestriction
> (retrieved from trySplit) as the checkpoint, so if the pipeline is stopped
> due to an error, then restarted with the same checkpoint it would resume
> off from the last written Residual checkpoint position?
>
> 2. Another question I have for writing an unbounded Splittable DoFn is how
> to write a test. With UnboundedSource, there was a way to wrap it to make
> it bounded so it would stop after processing N number of elements.  I would
> like to do the same, is there a uniform way of doing this currently?  If
> not I will just write a something custom for the loop in the processElement
> to check if it should continue opposed to 'while (true)'. Let me know if
> there is a uniform way, or you have a better idea on how to write a test
> for my PTransform that uses a Splittable DoFn.
>
> Thanks again.
>
>
> *~Vincent*
>
>
> On Mon, Nov 30, 2020 at 10:25 AM Boyuan Zhang <[email protected]> wrote:
>
>> In Splittable DoFn, the trySplit[1] API in RestrictionTracker is for
>> performing checkpointing, keeping primary as current restriction and
>> returning residuals. In the DoFn, you can do Splittable DoFn initiated
>> checkpoint by returning ProcessContinuation.resume()[2]. Beam programming
>> guide[3] also talks about Splittable DoFn initiated checkpoint and runner
>> initiated checkpoint.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L72-L108
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1297-L1333
>> [3]
>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>
>> On Sun, Nov 29, 2020 at 10:28 PM Vincent Marquez <
>> [email protected]> wrote:
>>
>>> Regarding checkpointing:
>>>
>>> I'm confused how the Splittable DoFn can make use of checkpoints to
>>> resume and not have data loss.  Unlike the old API that had a very easy to
>>> understand method called 'getCheckpointMark' that allows me to return the
>>> completed work, I don't see where that is done with the current API.
>>>
>>> I tried looking at the OffsetRangeTracker and how it is used by Kafka
>>> but I'm failing to understand it.  The process method takes the
>>> RestrictionTracker, but there isn't a way I see for the OffsetRangeTracker
>>> to represent half completed work (in the event of an exception/crash during
>>> a previous 'process' method run.   Is there some documentation that could
>>> help me understand this part?  Thanks in advance.
>>>
>>> *~Vincent*
>>>
>>>
>>> On Thu, Nov 26, 2020 at 2:01 PM Ismaël Mejía <[email protected]> wrote:
>>>
>>>> Just want to mention that we have been working with Vincent in the
>>>> ReadAll implementation for Cassandra based on normal DoFn, and we
>>>> expect to get it merged for the next release of Beam. Vincent is
>>>> familiarized now with DoFn based IO composition, a first step towards
>>>> SDF understanding. Vincent you can think of our Cassandra RingRange as
>>>> a Restriction in the context of SDF. Just for reference it would be
>>>> good to read in advance these two:
>>>>
>>>> https://beam.apache.org/blog/splittable-do-fn/
>>>> https://beam.apache.org/documentation/programming-guide/#sdf-basics
>>>>
>>>> Thanks Boyuan for offering your help I think it is really needed
>>>> considering that we don't have many Unbounded SDF connectors to use as
>>>> reference.
>>>>
>>>> On Thu, Nov 19, 2020 at 11:16 PM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>> >
>>>> >
>>>> >
>>>> > On Thu, Nov 19, 2020 at 1:29 PM Vincent Marquez <
>>>> [email protected]> wrote:
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>> >>>
>>>> >>> Hi Vincent,
>>>> >>>
>>>> >>> Thanks for your contribution! I'm happy to work with you on this
>>>> when you contribute the code into Beam.
>>>> >>
>>>> >>
>>>> >> Should I write up a JIRA to start?  I have access, I've already been
>>>> in the process of contributing some big changes to the CassandraIO
>>>> connector.
>>>> >
>>>> >
>>>> > Yes, please create a JIRA and assign it to yourself.
>>>> >
>>>> >>
>>>> >>
>>>> >>>
>>>> >>>
>>>> >>> Another thing is that it would be preferable to use Splittable DoFn
>>>> instead of using UnboundedSource to write a new IO.
>>>> >>
>>>> >>
>>>> >> I would prefer to use the UnboundedSource connector, I've already
>>>> written most of it, but also, I see some challenges using Splittable DoFn
>>>> for Redis streams.
>>>> >>
>>>> >> Unlike Kafka and Kinesis, Redis Streams offsets are not simply
>>>> monotonically increasing counters, so there is not a way  to just claim a
>>>> chunk of work and know that the chunk has any actual data in it.
>>>> >>
>>>> >> Since UnboundedSource is not yet deprecated, could I contribute that
>>>> after finishing up some test aspects, and then perhaps we can implement a
>>>> Splittable DoFn version?
>>>> >
>>>> >
>>>> > It would be nice not to build new IOs on top of UnboundedSource.
>>>> Currently we already have the wrapper class which translates the existing
>>>> UnboundedSource into Unbounded Splittable DoFn and executes the
>>>> UnboundedSource as the Splittable DoFn. How about you open a WIP PR and we
>>>> go through the UnboundedSource implementation together to figure out a
>>>> design for using Splittable DoFn?
>>>> >
>>>> >
>>>> >>
>>>> >>
>>>> >>
>>>> >>>
>>>> >>>
>>>> >>> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <
>>>> [email protected]> wrote:
>>>> >>>>
>>>> >>>> Currently, Redis offers a streaming queue functionality similar to
>>>> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>>>> >>>>
>>>> >>>> I've written an UnboundedSource connector that makes use of Redis
>>>> Streams as a POC and it seems to work well.
>>>> >>>>
>>>> >>>> If someone is willing to work with me, I could write up a JIRA
>>>> and/or open up a WIP pull request if there is interest in getting this as
>>>> an official connector.  I would mostly need guidance on naming/testing
>>>> aspects.
>>>> >>>>
>>>> >>>> https://redis.io/topics/streams-intro
>>>> >>>>
>>>> >>>> ~Vincent
>>>> >>
>>>> >>
>>>> >> ~Vincent
>>>>
>>>

Reply via email to