RE: KinesisIO checkpointing

2020-07-10 Thread Sunny, Mani Kolbe
)
   at 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn.process(BoundedReadFromUnboundedSource.java:165)

From: Mani Kolbe 
Sent: Thursday, July 9, 2020 10:56 PM
To: user@beam.apache.org
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

Is it required to set JobName and checkpointDir options for checkpointing to 
work?



On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik, 
mailto:lc...@google.com>> wrote:
The BoundedReadFromUnboundedReader does checkpoint the underlying 
UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe 
mailto:sun...@dnb.com>> wrote:
We did the same and started using maxReadTime and put the application to run on 
a recurring schedule of 5 minutes. It works fine end to end without any error.

But the problem is that it always starts reading from the beginning of the 
Kinesis stream when it stop-starts.

When I did some investigation on that, I found that when you set maxReadTime, 
it will run using BoundedReadFromUnboundedSource mode. That essentially 
converts source in to a bounded one. This means checkpointing or watermark no 
longer supported. Reader just reads for x number of time and exists.

Is there anyway recommended way to resume reading from the position it 
finished? Either using maxReadTime or in unboundedSource mode?

Could some point me to a sample pipeline code that uses Kinesis as source?

Regards,
Mani

From: Lars Almgren Schwartz 
mailto:lars.almg...@tink.com>>
Sent: Thursday, June 25, 2020 7:53 AM
To: user@beam.apache.org<mailto:user@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

We had the exact same problem, but have not spent any time trying to solve it, 
we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 
2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe 
mailto:sun...@dnb.com>> wrote:
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko 
mailto:aromanenko@gmail.com>>
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org<mailto:user@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner 
checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] 
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838=02%7C01%7CSunnyM%40dnb.com%7Cce0be33cfe3f4a90c68e08d82452f8e2%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299286070707698=cyEyW26hP1yQyHx%2Fa1Qomg3SzjB%2FqwbiRmpeEvDam8Q%3D=0>

On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe 
mailto:sun...@dnb.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. 
This pipeline read from Kinesis, do some translations, filtering and finally 
output to S3 using AvroIO writer. We are using Fixed windows with triggers 
based on element count and processing time intervals. Outputs path is 
partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, 
application is starting to read from Kinesis TRIM_HORIZON. That is, it is not 
resuming from last checkpoint position. Then I found that the checkpoint 
directory is based on --jobName and --checkpointDir properties. So I tried 
running as below:

spark-submit --master yarn --deploy-mode cluster --conf 
spark.dynamicAllocation.enabled=false \
--driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 
1 \
--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
--conf spark.executor.extraClassPath=/etc/hbase/conf \
/tmp/stream-processor-0.0.0.8-spark.jar \
--runner=SparkRunner \
--jobName=PrimeStreamProcessor \
--checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \
--useWindow=true \
--windowDuration=60s --windowLateness=0s --windowElementCount=1 \
--maxReadTime=-1 \
--streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path 
provided. But When the driver tries to broadcast this information to executors, 
it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Applic

Re: KinesisIO checkpointing

2020-07-09 Thread Mani Kolbe
Is it required to set JobName and checkpointDir options for checkpointing
to work?



On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik,  wrote:

> The BoundedReadFromUnboundedReader does checkpoint the underlying
> UnboundedSource, is that checkpoint logic not working?
> Do you have KinesisIO configured to always read from a specific point?
>
> On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe  wrote:
>
>> We did the same and started using maxReadTime and put the application to
>> run on a recurring schedule of 5 minutes. It works fine end to end without
>> any error.
>>
>>
>>
>> But the problem is that it always starts reading from the beginning of
>> the Kinesis stream when it stop-starts.
>>
>>
>>
>> When I did some investigation on that, I found that when you set
>> maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That
>> essentially converts source in to a bounded one. This means checkpointing
>> or watermark no longer supported. Reader just reads for x number of time
>> and exists.
>>
>>
>>
>> Is there anyway recommended way to resume reading from the position it
>> finished? Either using maxReadTime or in unboundedSource mode?
>>
>>
>>
>> Could some point me to a sample pipeline code that uses Kinesis as source?
>>
>>
>>
>> Regards,
>>
>> Mani
>>
>>
>>
>> *From:* Lars Almgren Schwartz 
>> *Sent:* Thursday, June 25, 2020 7:53 AM
>> *To:* user@beam.apache.org
>> *Subject:* Re: KinesisIO checkpointing
>>
>>
>>
>> *CAUTION:* This email originated from outside of D Please do not
>> click links or open attachments unless you recognize the sender and know
>> the content is safe.
>>
>>
>>
>> We had the exact same problem, but have not spent any time trying to
>> solve it, we just skipped checkpointing for now.
>>
>> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
>> and 2.19.
>>
>>
>>
>> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe  wrote:
>>
>> We are on spark 2.4 and Beam 2.22.0
>>
>>
>>
>> *From:* Alexey Romanenko 
>> *Sent:* Wednesday, June 24, 2020 5:15 PM
>> *To:* user@beam.apache.org
>> *Subject:* Re: KinesisIO checkpointing
>>
>>
>>
>> *CAUTION:* This email originated from outside of D Please do not
>> click links or open attachments unless you recognize the sender and know
>> the content is safe.
>>
>>
>>
>> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
>> checkpoints support [1].
>>
>>
>>
>> Could you specify which version of Spark and Beam you use?
>>
>>
>>
>> [1]
>> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
>> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838=02%7C01%7CSunnyM%40dnb.com%7Cfc04d9e9af094878c42208d818d47035%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286647986523098=dwQxTL7iaK0xBrFl8xrq7Y35OSJWqejuWdgtJCBwhGM%3D=0>
>>
>>
>>
>> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe  wrote:
>>
>>
>>
>> Hello,
>>
>>
>>
>> We are developing a beam pipeline which runs on SparkRunner on streaming
>> mode. This pipeline read from Kinesis, do some translations, filtering and
>> finally output to S3 using AvroIO writer. We are using Fixed windows with
>> triggers based on element count and processing time intervals. Outputs path
>> is partitioned by window start timestamp. allowedLateness=0sec
>>
>>
>>
>> This is working fine, but I have noticed that whenever we restarts
>> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
>> is, it is not resuming from last checkpoint position. Then I found that the
>> checkpoint directory is based on --jobName and --checkpointDir properties.
>> So I tried running as below:
>>
>>
>>
>> *spark-submit --master yarn --deploy-mode cluster --conf
>> spark.dynamicAllocation.enabled=false \*
>>
>> *--driver-memory 1g --executor-memory 1g --num-executors 1
>> --executor-cores 1 \*
>>
>> *--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>>
>> *--conf spark.executor.extraClassPath=/etc/hbase/conf \*
>>
>> */tmp/stream-processor-0.0.0.8-spark.jar \*
>>
>> *--runner=SparkRunner \*
>>

Re: KinesisIO checkpointing

2020-07-09 Thread Luke Cwik
The BoundedReadFromUnboundedReader does checkpoint the underlying
UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe  wrote:

> We did the same and started using maxReadTime and put the application to
> run on a recurring schedule of 5 minutes. It works fine end to end without
> any error.
>
>
>
> But the problem is that it always starts reading from the beginning of the
> Kinesis stream when it stop-starts.
>
>
>
> When I did some investigation on that, I found that when you set
> maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That
> essentially converts source in to a bounded one. This means checkpointing
> or watermark no longer supported. Reader just reads for x number of time
> and exists.
>
>
>
> Is there anyway recommended way to resume reading from the position it
> finished? Either using maxReadTime or in unboundedSource mode?
>
>
>
> Could some point me to a sample pipeline code that uses Kinesis as source?
>
>
>
> Regards,
>
> Mani
>
>
>
> *From:* Lars Almgren Schwartz 
> *Sent:* Thursday, June 25, 2020 7:53 AM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> We had the exact same problem, but have not spent any time trying to solve
> it, we just skipped checkpointing for now.
>
> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
> and 2.19.
>
>
>
> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe  wrote:
>
> We are on spark 2.4 and Beam 2.22.0
>
>
>
> *From:* Alexey Romanenko 
> *Sent:* Wednesday, June 24, 2020 5:15 PM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
> checkpoints support [1].
>
>
>
> Could you specify which version of Spark and Beam you use?
>
>
>
> [1]
> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838=02%7C01%7CSunnyM%40dnb.com%7Cfc04d9e9af094878c42208d818d47035%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286647986523098=dwQxTL7iaK0xBrFl8xrq7Y35OSJWqejuWdgtJCBwhGM%3D=0>
>
>
>
> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe  wrote:
>
>
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is working fine, but I have noticed that whenever we restarts
> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
> is, it is not resuming from last checkpoint position. Then I found that the
> checkpoint directory is based on --jobName and --checkpointDir properties.
> So I tried running as below:
>
>
>
> *spark-submit --master yarn --deploy-mode cluster --conf
> spark.dynamicAllocation.enabled=false \*
>
> *--driver-memory 1g --executor-memory 1g --num-executors 1
> --executor-cores 1 \*
>
> *--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>
> *--conf spark.executor.extraClassPath=/etc/hbase/conf \*
>
> */tmp/stream-processor-0.0.0.8-spark.jar \*
>
> *--runner=SparkRunner \*
>
> *--jobName=PrimeStreamProcessor \*
>
> *--checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \*
>
> *--useWindow=true \*
>
> *--windowDuration=60s --windowLateness=0s --windowElementCount=1 \*
>
> *--maxReadTime=-1 \*
>
> *--streaming=true*
>
>
>
> I can see that it is able to fetch checkpoint data from *checkpointDir* path
> provided. But When the driver tries to broadcast this information to
> executors, it is failing with below exception.
>
>
>
>
>
>
>
>
> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message:
> User class threw exc

RE: KinesisIO checkpointing

2020-07-09 Thread Sunny, Mani Kolbe
We did the same and started using maxReadTime and put the application to run on 
a recurring schedule of 5 minutes. It works fine end to end without any error.

But the problem is that it always starts reading from the beginning of the 
Kinesis stream when it stop-starts.

When I did some investigation on that, I found that when you set maxReadTime, 
it will run using BoundedReadFromUnboundedSource mode. That essentially 
converts source in to a bounded one. This means checkpointing or watermark no 
longer supported. Reader just reads for x number of time and exists.

Is there anyway recommended way to resume reading from the position it 
finished? Either using maxReadTime or in unboundedSource mode?

Could some point me to a sample pipeline code that uses Kinesis as source?

Regards,
Mani

From: Lars Almgren Schwartz 
Sent: Thursday, June 25, 2020 7:53 AM
To: user@beam.apache.org
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

We had the exact same problem, but have not spent any time trying to solve it, 
we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 
2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe 
mailto:sun...@dnb.com>> wrote:
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko 
mailto:aromanenko@gmail.com>>
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org<mailto:user@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner 
checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] 
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838=02%7C01%7CSunnyM%40dnb.com%7Cfc04d9e9af094878c42208d818d47035%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286647986523098=dwQxTL7iaK0xBrFl8xrq7Y35OSJWqejuWdgtJCBwhGM%3D=0>

On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe 
mailto:sun...@dnb.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. 
This pipeline read from Kinesis, do some translations, filtering and finally 
output to S3 using AvroIO writer. We are using Fixed windows with triggers 
based on element count and processing time intervals. Outputs path is 
partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, 
application is starting to read from Kinesis TRIM_HORIZON. That is, it is not 
resuming from last checkpoint position. Then I found that the checkpoint 
directory is based on --jobName and --checkpointDir properties. So I tried 
running as below:

spark-submit --master yarn --deploy-mode cluster --conf 
spark.dynamicAllocation.enabled=false \
--driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 
1 \
--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
--conf spark.executor.extraClassPath=/etc/hbase/conf \
/tmp/stream-processor-0.0.0.8-spark.jar \
--runner=SparkRunner \
--jobName=PrimeStreamProcessor \
--checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \
--useWindow=true \
--windowDuration=60s --windowLateness=0s --windowElementCount=1 \
--maxReadTime=-1 \
--streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path 
provided. But When the driver tries to broadcast this information to executors, 
it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User 
class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.UnsupportedOperationException: Accumulator must be registered before 
send to executor
at 
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
at 
org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)

at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be 
registered before send to executor
at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is 
actually supported on KinesisIO?

Regards,
Mani



Re: KinesisIO checkpointing

2020-06-25 Thread Lars Almgren Schwartz
We had the exact same problem, but have not spent any time trying to solve
it, we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
and 2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe  wrote:

> We are on spark 2.4 and Beam 2.22.0
>
>
>
> *From:* Alexey Romanenko 
> *Sent:* Wednesday, June 24, 2020 5:15 PM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
> checkpoints support [1].
>
>
>
> Could you specify which version of Spark and Beam you use?
>
>
>
> [1]
> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838=02%7C01%7CSunnyM%40dnb.com%7C6e4c3abaf7a4488398ce08d81859bc9c%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286120987612030=zzUcv9LYIX65yl%2FjV6uMP5C5cZdBgStDVkRL3NXRfm8%3D=0>
>
>
>
> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe  wrote:
>
>
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is working fine, but I have noticed that whenever we restarts
> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
> is, it is not resuming from last checkpoint position. Then I found that the
> checkpoint directory is based on --jobName and --checkpointDir properties.
> So I tried running as below:
>
>
>
> *spark-submit --master yarn --deploy-mode cluster --conf
> spark.dynamicAllocation.enabled=false \*
>
> *--driver-memory 1g --executor-memory 1g --num-executors 1
> --executor-cores 1 \*
>
> *--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>
> *--conf spark.executor.extraClassPath=/etc/hbase/conf \*
>
> */tmp/stream-processor-0.0.0.8-spark.jar \*
>
> *--runner=SparkRunner \*
>
> *--jobName=PrimeStreamProcessor \*
>
> *--checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \*
>
> *--useWindow=true \*
>
> *--windowDuration=60s --windowLateness=0s --windowElementCount=1 \*
>
> *--maxReadTime=-1 \*
>
> *--streaming=true*
>
>
>
> I can see that it is able to fetch checkpoint data from *checkpointDir* path
> provided. But When the driver tries to broadcast this information to
> executors, it is failing with below exception.
>
>
>
>
>
>
>
>
> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message:
> User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.UnsupportedOperationException: Accumulator must be registered
> before send to executor at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>  at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be
> registered before send to executor at
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
> at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)*
>
>
>
>
>
> Any idea? Is resuming from checkpoint position on application restart is
> actually supported on KinesisIO?
>
>
>
> Regards,
>
> Mani
>
>
>


RE: KinesisIO checkpointing

2020-06-24 Thread Sunny, Mani Kolbe
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko 
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner 
checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] 
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838=02%7C01%7CSunnyM%40dnb.com%7C6e4c3abaf7a4488398ce08d81859bc9c%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286120987612030=zzUcv9LYIX65yl%2FjV6uMP5C5cZdBgStDVkRL3NXRfm8%3D=0>


On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe 
mailto:sun...@dnb.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. 
This pipeline read from Kinesis, do some translations, filtering and finally 
output to S3 using AvroIO writer. We are using Fixed windows with triggers 
based on element count and processing time intervals. Outputs path is 
partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, 
application is starting to read from Kinesis TRIM_HORIZON. That is, it is not 
resuming from last checkpoint position. Then I found that the checkpoint 
directory is based on --jobName and --checkpointDir properties. So I tried 
running as below:

spark-submit --master yarn --deploy-mode cluster --conf 
spark.dynamicAllocation.enabled=false \
--driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 
1 \
--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
--conf spark.executor.extraClassPath=/etc/hbase/conf \
/tmp/stream-processor-0.0.0.8-spark.jar \
--runner=SparkRunner \
--jobName=PrimeStreamProcessor \

--checkpointDir=hdfs:///tmp/PrimeStreamProcessor
 checkpoint \
--useWindow=true \
--windowDuration=60s --windowLateness=0s --windowElementCount=1 \
--maxReadTime=-1 \
--streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path 
provided. But When the driver tries to broadcast this information to executors, 
it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User 
class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.UnsupportedOperationException: Accumulator must be registered before 
send to executor
at 
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
at 
org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)

at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be 
registered before send to executor
at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is 
actually supported on KinesisIO?

Regards,
Mani



Re: KinesisIO checkpointing

2020-06-24 Thread Alexey Romanenko
Yes, KinesisIO supports restart from checkpoints and it’s based on runner 
checkpoints support [1].  

Could you specify which version of Spark and Beam you use?

[1] 
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
 


> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe  wrote:
> 
> Hello,
>  
> We are developing a beam pipeline which runs on SparkRunner on streaming 
> mode. This pipeline read from Kinesis, do some translations, filtering and 
> finally output to S3 using AvroIO writer. We are using Fixed windows with 
> triggers based on element count and processing time intervals. Outputs path 
> is partitioned by window start timestamp. allowedLateness=0sec
>  
> This is working fine, but I have noticed that whenever we restarts streaming, 
> application is starting to read from Kinesis TRIM_HORIZON. That is, it is not 
> resuming from last checkpoint position. Then I found that the checkpoint 
> directory is based on --jobName and --checkpointDir properties. So I tried 
> running as below:
>  
> spark-submit --master yarn --deploy-mode cluster --conf 
> spark.dynamicAllocation.enabled=false \
> --driver-memory 1g --executor-memory 1g --num-executors 1 
> --executor-cores 1 \
> --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
> --conf spark.executor.extraClassPath=/etc/hbase/conf \
> /tmp/stream-processor-0.0.0.8-spark.jar \
> --runner=SparkRunner \
> --jobName=PrimeStreamProcessor \
> --checkpointDir=hdfs:///tmp/PrimeStreamProcessor 
>  checkpoint \
> --useWindow=true \
> --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
> --maxReadTime=-1 \
> --streaming=true
>  
> I can see that it is able to fetch checkpoint data from checkpointDir path 
> provided. But When the driver tries to broadcast this information to 
> executors, it is failing with below exception.
> 20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User 
> class threw exception: 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.UnsupportedOperationException: Accumulator must be registered 
> before send to executor
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
> 
> at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be 
> registered before send to executor
> at 
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
> at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
>  
>  
> Any idea? Is resuming from checkpoint position on application restart is 
> actually supported on KinesisIO?
>  
> Regards,
> Mani