[ https://issues.apache.org/jira/browse/BEAM-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16914387#comment-16914387 ]
Alexey Romanenko commented on BEAM-7978: ---------------------------------------- As far as I can tell, {{getTotalBacklogBytes()}} is used only in Dataflow runner, so seems like no other runners are affected. I tested with direct and spark runners and didn't notice any major issues (well, except the fact, that initial watermark is set to {{BoundedWindow.TIMESTAMP_MIN_VALUE}} and then updated). So, the fix looks simple on the first sight. [~Juraszek] Would you be able to test it on your side? > ArithmeticExceptions on getting backlog bytes > ---------------------------------------------- > > Key: BEAM-7978 > URL: https://issues.apache.org/jira/browse/BEAM-7978 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis > Affects Versions: 2.14.0 > Reporter: Mateusz > Assignee: Alexey Romanenko > Priority: Major > > Hello, > Beam 2.14.0 > (and to be more precise > [commit|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ec]) > introduced a change in watermark calculation in Kinesis IO causing below > error: > {code:java} > exception: "java.lang.RuntimeException: Unknown kinesis failure, when trying > to reach kinesis > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:227) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:167) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:155) > at > org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:158) > at > org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:433) > at > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1289) > at > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) > at > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ArithmeticException: Value cannot fit in an int: > 153748963401 > at org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:229) > at > org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141) > at > org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72) > at org.joda.time.Minutes.minutesBetween(Minutes.java:101) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:169) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210) > ... 10 more > {code} > We spotted this issue on Dataflow runner. It's problematic as inability to > get backlog bytes seems to result in constant recreation of KinesisReader. > The issue happens if the backlog bytes are retrieved before watermark value > is updated from initial default value. Easy way to reproduce it is to create > a pipeline with Kinesis source for a stream where no records are being put. > While debugging it locally, you can observe that the watermark is set to the > value on the past (like: "-290308-12-21T19:59:05.225Z"). After two minutes > (default watermark idle duration threshold is set to 2 minutes) , the > watermark is set to value of > [watermarkIdleThreshold|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyFactory.java#L110]), > so the next backlog bytes retrieval should be correct. However, as described > before, running the pipeline on Dataflow runner results in KinesisReader > being closed just after creation, so the watermark won't be fixed. > The reason of the issue is following: The introduced watermark policies are > relying on > [WatermarkParameters|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java] > which initialises currentWatermark and eventTime to > [BoundedWindow.TIMESTAMP_MIN_VALUE|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ecR52]. > This result in watermark being set to new Instant(-9223372036854775L) at the > KinesisReader creation. Calculated [period between the watermark and the > current > timestamp|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L169] > is bigger than expected causing the ArithmeticException to be thrown. > The maximum retention on Kinesis streams is [7 > days|https://aws.amazon.com/kinesis/data-streams/faqs/] so it should be safe > to initialise the affected watermark parameters with new > Instant().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD) where > MAX_KINESIS_STREAM_RETENTION_PERIOD is the duration of 7 days. > Remark: seems that in the past there was similar issue present (fixed in > 2.4). Please look into the > [ticket|https://issues.apache.org/jira/browse/BEAM-3881]. > Best regards, > Mateusz -- This message was sent by Atlassian Jira (v8.3.2#803003)