The BoundedReadFromUnboundedReader does checkpoint the underlying UnboundedSource, is that checkpoint logic not working? Do you have KinesisIO configured to always read from a specific point?
On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <sun...@dnb.com> wrote: > We did the same and started using maxReadTime and put the application to > run on a recurring schedule of 5 minutes. It works fine end to end without > any error. > > > > But the problem is that it always starts reading from the beginning of the > Kinesis stream when it stop-starts. > > > > When I did some investigation on that, I found that when you set > maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That > essentially converts source in to a bounded one. This means checkpointing > or watermark no longer supported. Reader just reads for x number of time > and exists. > > > > Is there anyway recommended way to resume reading from the position it > finished? Either using maxReadTime or in unboundedSource mode? > > > > Could some point me to a sample pipeline code that uses Kinesis as source? > > > > Regards, > > Mani > > > > *From:* Lars Almgren Schwartz <lars.almg...@tink.com> > *Sent:* Thursday, June 25, 2020 7:53 AM > *To:* user@beam.apache.org > *Subject:* Re: KinesisIO checkpointing > > > > *CAUTION:* This email originated from outside of D&B. Please do not click > links or open attachments unless you recognize the sender and know the > content is safe. > > > > We had the exact same problem, but have not spent any time trying to solve > it, we just skipped checkpointing for now. > > When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 > and 2.19. > > > > On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <sun...@dnb.com> wrote: > > We are on spark 2.4 and Beam 2.22.0 > > > > *From:* Alexey Romanenko <aromanenko....@gmail.com> > *Sent:* Wednesday, June 24, 2020 5:15 PM > *To:* user@beam.apache.org > *Subject:* Re: KinesisIO checkpointing > > > > *CAUTION:* This email originated from outside of D&B. Please do not click > links or open attachments unless you recognize the sender and know the > content is safe. > > > > Yes, KinesisIO supports restart from checkpoints and it’s based on runner > checkpoints support [1]. > > > > Could you specify which version of Spark and Beam you use? > > > > [1] > https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838 > <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cfc04d9e9af094878c42208d818d47035%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286647986523098&sdata=dwQxTL7iaK0xBrFl8xrq7Y35OSJWqejuWdgtJCBwhGM%3D&reserved=0> > > > > On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <sun...@dnb.com> wrote: > > > > Hello, > > > > We are developing a beam pipeline which runs on SparkRunner on streaming > mode. This pipeline read from Kinesis, do some translations, filtering and > finally output to S3 using AvroIO writer. We are using Fixed windows with > triggers based on element count and processing time intervals. Outputs path > is partitioned by window start timestamp. allowedLateness=0sec > > > > This is working fine, but I have noticed that whenever we restarts > streaming, application is starting to read from Kinesis TRIM_HORIZON. That > is, it is not resuming from last checkpoint position. Then I found that the > checkpoint directory is based on --jobName and --checkpointDir properties. > So I tried running as below: > > > > *spark-submit --master yarn --deploy-mode cluster --conf > spark.dynamicAllocation.enabled=false \* > > * --driver-memory 1g --executor-memory 1g --num-executors 1 > --executor-cores 1 \* > > * --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \* > > * --conf spark.executor.extraClassPath=/etc/hbase/conf \* > > * /tmp/stream-processor-0.0.0.8-spark.jar \* > > * --runner=SparkRunner \* > > * --jobName=PrimeStreamProcessor \* > > * --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \* > > * --useWindow=true \* > > * --windowDuration=60s --windowLateness=0s --windowElementCount=1 \* > > * --maxReadTime=-1 \* > > * --streaming=true* > > > > I can see that it is able to fetch checkpoint data from *checkpointDir* path > provided. But When the driver tries to broadcast this information to > executors, it is failing with below exception. > > > > > > > > > *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: > User class threw exception: > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.UnsupportedOperationException: Accumulator must be registered > before send to executor at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71) > at > org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44) > .... at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678) > Caused by: java.lang.UnsupportedOperationException: Accumulator must be > registered before send to executor at > org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162) > at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)* > > > > > > Any idea? Is resuming from checkpoint position on application restart is > actually supported on KinesisIO? > > > > Regards, > > Mani > > > >