The BoundedReadFromUnboundedReader does checkpoint the underlying
UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <sun...@dnb.com> wrote:

> We did the same and started using maxReadTime and put the application to
> run on a recurring schedule of 5 minutes. It works fine end to end without
> any error.
>
>
>
> But the problem is that it always starts reading from the beginning of the
> Kinesis stream when it stop-starts.
>
>
>
> When I did some investigation on that, I found that when you set
> maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That
> essentially converts source in to a bounded one. This means checkpointing
> or watermark no longer supported. Reader just reads for x number of time
> and exists.
>
>
>
> Is there anyway recommended way to resume reading from the position it
> finished? Either using maxReadTime or in unboundedSource mode?
>
>
>
> Could some point me to a sample pipeline code that uses Kinesis as source?
>
>
>
> Regards,
>
> Mani
>
>
>
> *From:* Lars Almgren Schwartz <lars.almg...@tink.com>
> *Sent:* Thursday, June 25, 2020 7:53 AM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> We had the exact same problem, but have not spent any time trying to solve
> it, we just skipped checkpointing for now.
>
> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
> and 2.19.
>
>
>
> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <sun...@dnb.com> wrote:
>
> We are on spark 2.4 and Beam 2.22.0
>
>
>
> *From:* Alexey Romanenko <aromanenko....@gmail.com>
> *Sent:* Wednesday, June 24, 2020 5:15 PM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
> checkpoints support [1].
>
>
>
> Could you specify which version of Spark and Beam you use?
>
>
>
> [1]
> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cfc04d9e9af094878c42208d818d47035%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286647986523098&sdata=dwQxTL7iaK0xBrFl8xrq7Y35OSJWqejuWdgtJCBwhGM%3D&reserved=0>
>
>
>
> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <sun...@dnb.com> wrote:
>
>
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is working fine, but I have noticed that whenever we restarts
> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
> is, it is not resuming from last checkpoint position. Then I found that the
> checkpoint directory is based on --jobName and --checkpointDir properties.
> So I tried running as below:
>
>
>
> *spark-submit --master yarn --deploy-mode cluster --conf
> spark.dynamicAllocation.enabled=false \*
>
> *    --driver-memory 1g --executor-memory 1g --num-executors 1
> --executor-cores 1 \*
>
> *    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>
> *    --conf spark.executor.extraClassPath=/etc/hbase/conf \*
>
> *    /tmp/stream-processor-0.0.0.8-spark.jar \*
>
> *    --runner=SparkRunner \*
>
> *    --jobName=PrimeStreamProcessor \*
>
> *    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \*
>
> *    --useWindow=true \*
>
> *    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \*
>
> *    --maxReadTime=-1 \*
>
> *    --streaming=true*
>
>
>
> I can see that it is able to fetch checkpoint data from *checkpointDir* path
> provided. But When the driver tries to broadcast this information to
> executors, it is failing with below exception.
>
>
>
>
>
>
>
>
> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message:
> User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.UnsupportedOperationException: Accumulator must be registered
> before send to executor         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>         ....         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be
> registered before send to executor         at
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>         at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)*
>
>
>
>
>
> Any idea? Is resuming from checkpoint position on application restart is
> actually supported on KinesisIO?
>
>
>
> Regards,
>
> Mani
>
>
>
>

Reply via email to