Is it required to set JobName and checkpointDir options for checkpointing to work?
On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik, <lc...@google.com> wrote: > The BoundedReadFromUnboundedReader does checkpoint the underlying > UnboundedSource, is that checkpoint logic not working? > Do you have KinesisIO configured to always read from a specific point? > > On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <sun...@dnb.com> wrote: > >> We did the same and started using maxReadTime and put the application to >> run on a recurring schedule of 5 minutes. It works fine end to end without >> any error. >> >> >> >> But the problem is that it always starts reading from the beginning of >> the Kinesis stream when it stop-starts. >> >> >> >> When I did some investigation on that, I found that when you set >> maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That >> essentially converts source in to a bounded one. This means checkpointing >> or watermark no longer supported. Reader just reads for x number of time >> and exists. >> >> >> >> Is there anyway recommended way to resume reading from the position it >> finished? Either using maxReadTime or in unboundedSource mode? >> >> >> >> Could some point me to a sample pipeline code that uses Kinesis as source? >> >> >> >> Regards, >> >> Mani >> >> >> >> *From:* Lars Almgren Schwartz <lars.almg...@tink.com> >> *Sent:* Thursday, June 25, 2020 7:53 AM >> *To:* user@beam.apache.org >> *Subject:* Re: KinesisIO checkpointing >> >> >> >> *CAUTION:* This email originated from outside of D&B. Please do not >> click links or open attachments unless you recognize the sender and know >> the content is safe. >> >> >> >> We had the exact same problem, but have not spent any time trying to >> solve it, we just skipped checkpointing for now. >> >> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 >> and 2.19. >> >> >> >> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <sun...@dnb.com> wrote: >> >> We are on spark 2.4 and Beam 2.22.0 >> >> >> >> *From:* Alexey Romanenko <aromanenko....@gmail.com> >> *Sent:* Wednesday, June 24, 2020 5:15 PM >> *To:* user@beam.apache.org >> *Subject:* Re: KinesisIO checkpointing >> >> >> >> *CAUTION:* This email originated from outside of D&B. Please do not >> click links or open attachments unless you recognize the sender and know >> the content is safe. >> >> >> >> Yes, KinesisIO supports restart from checkpoints and it’s based on runner >> checkpoints support [1]. >> >> >> >> Could you specify which version of Spark and Beam you use? >> >> >> >> [1] >> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838 >> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cfc04d9e9af094878c42208d818d47035%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286647986523098&sdata=dwQxTL7iaK0xBrFl8xrq7Y35OSJWqejuWdgtJCBwhGM%3D&reserved=0> >> >> >> >> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <sun...@dnb.com> wrote: >> >> >> >> Hello, >> >> >> >> We are developing a beam pipeline which runs on SparkRunner on streaming >> mode. This pipeline read from Kinesis, do some translations, filtering and >> finally output to S3 using AvroIO writer. We are using Fixed windows with >> triggers based on element count and processing time intervals. Outputs path >> is partitioned by window start timestamp. allowedLateness=0sec >> >> >> >> This is working fine, but I have noticed that whenever we restarts >> streaming, application is starting to read from Kinesis TRIM_HORIZON. That >> is, it is not resuming from last checkpoint position. Then I found that the >> checkpoint directory is based on --jobName and --checkpointDir properties. >> So I tried running as below: >> >> >> >> *spark-submit --master yarn --deploy-mode cluster --conf >> spark.dynamicAllocation.enabled=false \* >> >> * --driver-memory 1g --executor-memory 1g --num-executors 1 >> --executor-cores 1 \* >> >> * --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \* >> >> * --conf spark.executor.extraClassPath=/etc/hbase/conf \* >> >> * /tmp/stream-processor-0.0.0.8-spark.jar \* >> >> * --runner=SparkRunner \* >> >> * --jobName=PrimeStreamProcessor \* >> >> * --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \* >> >> * --useWindow=true \* >> >> * --windowDuration=60s --windowLateness=0s --windowElementCount=1 \* >> >> * --maxReadTime=-1 \* >> >> * --streaming=true* >> >> >> >> I can see that it is able to fetch checkpoint data from *checkpointDir* path >> provided. But When the driver tries to broadcast this information to >> executors, it is failing with below exception. >> >> >> >> >> >> >> >> >> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: >> User class threw exception: >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> java.lang.UnsupportedOperationException: Accumulator must be registered >> before send to executor at >> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71) >> at >> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44) >> .... at >> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678) >> Caused by: java.lang.UnsupportedOperationException: Accumulator must be >> registered before send to executor at >> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162) >> at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)* >> >> >> >> >> >> Any idea? Is resuming from checkpoint position on application restart is >> actually supported on KinesisIO? >> >> >> >> Regards, >> >> Mani >> >> >> >>