Is it required to set JobName and checkpointDir options for checkpointing
to work?



On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik, <lc...@google.com> wrote:

> The BoundedReadFromUnboundedReader does checkpoint the underlying
> UnboundedSource, is that checkpoint logic not working?
> Do you have KinesisIO configured to always read from a specific point?
>
> On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <sun...@dnb.com> wrote:
>
>> We did the same and started using maxReadTime and put the application to
>> run on a recurring schedule of 5 minutes. It works fine end to end without
>> any error.
>>
>>
>>
>> But the problem is that it always starts reading from the beginning of
>> the Kinesis stream when it stop-starts.
>>
>>
>>
>> When I did some investigation on that, I found that when you set
>> maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That
>> essentially converts source in to a bounded one. This means checkpointing
>> or watermark no longer supported. Reader just reads for x number of time
>> and exists.
>>
>>
>>
>> Is there anyway recommended way to resume reading from the position it
>> finished? Either using maxReadTime or in unboundedSource mode?
>>
>>
>>
>> Could some point me to a sample pipeline code that uses Kinesis as source?
>>
>>
>>
>> Regards,
>>
>> Mani
>>
>>
>>
>> *From:* Lars Almgren Schwartz <lars.almg...@tink.com>
>> *Sent:* Thursday, June 25, 2020 7:53 AM
>> *To:* user@beam.apache.org
>> *Subject:* Re: KinesisIO checkpointing
>>
>>
>>
>> *CAUTION:* This email originated from outside of D&B. Please do not
>> click links or open attachments unless you recognize the sender and know
>> the content is safe.
>>
>>
>>
>> We had the exact same problem, but have not spent any time trying to
>> solve it, we just skipped checkpointing for now.
>>
>> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
>> and 2.19.
>>
>>
>>
>> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <sun...@dnb.com> wrote:
>>
>> We are on spark 2.4 and Beam 2.22.0
>>
>>
>>
>> *From:* Alexey Romanenko <aromanenko....@gmail.com>
>> *Sent:* Wednesday, June 24, 2020 5:15 PM
>> *To:* user@beam.apache.org
>> *Subject:* Re: KinesisIO checkpointing
>>
>>
>>
>> *CAUTION:* This email originated from outside of D&B. Please do not
>> click links or open attachments unless you recognize the sender and know
>> the content is safe.
>>
>>
>>
>> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
>> checkpoints support [1].
>>
>>
>>
>> Could you specify which version of Spark and Beam you use?
>>
>>
>>
>> [1]
>> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
>> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cfc04d9e9af094878c42208d818d47035%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286647986523098&sdata=dwQxTL7iaK0xBrFl8xrq7Y35OSJWqejuWdgtJCBwhGM%3D&reserved=0>
>>
>>
>>
>> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <sun...@dnb.com> wrote:
>>
>>
>>
>> Hello,
>>
>>
>>
>> We are developing a beam pipeline which runs on SparkRunner on streaming
>> mode. This pipeline read from Kinesis, do some translations, filtering and
>> finally output to S3 using AvroIO writer. We are using Fixed windows with
>> triggers based on element count and processing time intervals. Outputs path
>> is partitioned by window start timestamp. allowedLateness=0sec
>>
>>
>>
>> This is working fine, but I have noticed that whenever we restarts
>> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
>> is, it is not resuming from last checkpoint position. Then I found that the
>> checkpoint directory is based on --jobName and --checkpointDir properties.
>> So I tried running as below:
>>
>>
>>
>> *spark-submit --master yarn --deploy-mode cluster --conf
>> spark.dynamicAllocation.enabled=false \*
>>
>> *    --driver-memory 1g --executor-memory 1g --num-executors 1
>> --executor-cores 1 \*
>>
>> *    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>>
>> *    --conf spark.executor.extraClassPath=/etc/hbase/conf \*
>>
>> *    /tmp/stream-processor-0.0.0.8-spark.jar \*
>>
>> *    --runner=SparkRunner \*
>>
>> *    --jobName=PrimeStreamProcessor \*
>>
>> *    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \*
>>
>> *    --useWindow=true \*
>>
>> *    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \*
>>
>> *    --maxReadTime=-1 \*
>>
>> *    --streaming=true*
>>
>>
>>
>> I can see that it is able to fetch checkpoint data from *checkpointDir* path
>> provided. But When the driver tries to broadcast this information to
>> executors, it is failing with below exception.
>>
>>
>>
>>
>>
>>
>>
>>
>> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message:
>> User class threw exception:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.UnsupportedOperationException: Accumulator must be registered
>> before send to executor         at
>> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>>         at
>> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>>         ....         at
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
>> Caused by: java.lang.UnsupportedOperationException: Accumulator must be
>> registered before send to executor         at
>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>>         at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)*
>>
>>
>>
>>
>>
>> Any idea? Is resuming from checkpoint position on application restart is
>> actually supported on KinesisIO?
>>
>>
>>
>> Regards,
>>
>> Mani
>>
>>
>>
>>

Reply via email to