[ 
https://issues.apache.org/jira/browse/BEAM-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16915893#comment-16915893
 ] 

Alexey Romanenko commented on BEAM-7978:
----------------------------------------

[~Juraszek] Sure, I created a PR for that

> ArithmeticExceptions on getting backlog bytes 
> ----------------------------------------------
>
>                 Key: BEAM-7978
>                 URL: https://issues.apache.org/jira/browse/BEAM-7978
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.14.0
>            Reporter: Mateusz
>            Assignee: Alexey Romanenko
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Hello,
> Beam 2.14.0
>  (and to be more precise 
> [commit|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ec])
>  introduced a change in watermark calculation in Kinesis IO causing below 
> error:
> {code:java}
> exception:  "java.lang.RuntimeException: Unknown kinesis failure, when trying 
> to reach kinesis
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:227)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:167)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:155)
>       at 
> org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:158)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:433)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1289)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArithmeticException: Value cannot fit in an int: 
> 153748963401
>       at org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:229)
>       at 
> org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141)
>       at 
> org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72)
>       at org.joda.time.Minutes.minutesBetween(Minutes.java:101)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:169)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
>       ... 10 more
> {code}
> We spotted this issue on Dataflow runner. It's problematic as inability to 
> get backlog bytes seems to result in constant recreation of KinesisReader.
> The issue happens if the backlog bytes are retrieved before watermark value 
> is updated from initial default value. Easy way to reproduce it is to create 
> a pipeline with Kinesis source for a stream where no records are being put. 
> While debugging it locally, you can observe that the watermark is set to the 
> value on the past (like: "-290308-12-21T19:59:05.225Z"). After two minutes 
> (default watermark idle duration threshold is set to 2 minutes) , the 
> watermark is set to value of 
> [watermarkIdleThreshold|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyFactory.java#L110]),
>  so the next backlog bytes retrieval should be correct. However, as described 
> before, running the pipeline on Dataflow runner results in KinesisReader 
> being closed just after creation, so the watermark won't be fixed.
> The reason of the issue is following: The introduced watermark policies are 
> relying on 
> [WatermarkParameters|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java]
>  which initialises currentWatermark and eventTime to 
> [BoundedWindow.TIMESTAMP_MIN_VALUE|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ecR52].
>  This result in watermark being set to new Instant(-9223372036854775L) at the 
> KinesisReader creation. Calculated [period between the watermark and the 
> current 
> timestamp|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L169]
>  is bigger than expected causing the ArithmeticException to be thrown.
> The maximum retention on Kinesis streams is  [7 
> days|https://aws.amazon.com/kinesis/data-streams/faqs/] so it should be safe 
> to initialise the affected watermark parameters with new 
> Instant().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD) where 
> MAX_KINESIS_STREAM_RETENTION_PERIOD is the duration of 7 days.
> Remark: seems that in the past there was similar issue present (fixed in 
> 2.4). Please look into the 
> [ticket|https://issues.apache.org/jira/browse/BEAM-3881]. 
> Best regards,
>  Mateusz



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to