[ 
https://issues.apache.org/jira/browse/BEAM-3726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pawel Bartoszek updated BEAM-3726:
----------------------------------
    Description: 
When the job is restored from savepoint Kinesis Reader throws almost always 
{{java.lang.IllegalArgumentException: Attempting to move backwards}}

After a few job restarts caused again by the same exception, job finally starts 
up and continues to run with no further problems.

Beam job is reading from 32 shards with parallelism set to 32. Using Flink 1.3.2

 
{code:java}
java.lang.IllegalArgumentException: Attempting to move backwards
at 
org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97)
at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114)
at org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137)
at 
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702){code}
 

Kinesis Reader transformation configuration:
{code:java}
pipeline.apply("KINESIS READER", KinesisIO.read()
.withStreamName(streamName)
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1)){code}
 

When testing locally I managed to catch this exception. Just before executing 
this 
[link|https://github.com/apache/beam/blob/6c93105c2cb7be709c6b3e2e6cdcd09df2b48308/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L97]
 that threw exception I captured the state of the class so that you can 
replicate the issue
{code:java}
org.apache.beam.sdk.util.MovingFunction@71781a[sampleUpdateMs=5000,numSignificantBuckets=2,numSignificantSamples=10,function=org.apache.beam.sdk.transforms.Min$MinLongFn@7909d8d3,buckets={9223372036854775807,9223372036854775807,1519315344334,1519315343759,1519315343770,1519315344086,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807},numSamples={0,0,1,158,156,146,0,0,0,0,144,0},currentMsSinceEpoch=1519315585000,currentIndex=2]{code}
 

the add function of MovingFunction was called with nowMsSinceEpoch = 
1519315583591

 

  was:
When the job is restored from savepoint Kinesis Reader throws sometimes 
{{java.lang.IllegalArgumentException: Attempting to move backwards}}

After a few job restarts caused again by the same exception, job finally starts 
up and continues to run with no further problems.

 
{code:java}
java.lang.IllegalArgumentException: Attempting to move backwards
at 
org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97)
at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114)
at org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137)
at 
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702){code}
 

Kinesis Reader transformation configuration:
{code:java}
pipeline.apply("KINESIS READER", KinesisIO.read()
.withStreamName(streamName)
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1)){code}
 


> Kinesis Reader: java.lang.IllegalArgumentException: Attempting to move 
> backwards
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-3726
>                 URL: https://issues.apache.org/jira/browse/BEAM-3726
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kinesis
>    Affects Versions: 2.2.0
>            Reporter: Pawel Bartoszek
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>
> When the job is restored from savepoint Kinesis Reader throws almost always 
> {{java.lang.IllegalArgumentException: Attempting to move backwards}}
> After a few job restarts caused again by the same exception, job finally 
> starts up and continues to run with no further problems.
> Beam job is reading from 32 shards with parallelism set to 32. Using Flink 
> 1.3.2
>  
> {code:java}
> java.lang.IllegalArgumentException: Attempting to move backwards
> at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
> at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97)
> at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114)
> at 
> org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137)
> at 
> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
> at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702){code}
>  
> Kinesis Reader transformation configuration:
> {code:java}
> pipeline.apply("KINESIS READER", KinesisIO.read()
> .withStreamName(streamName)
> .withInitialPositionInStream(InitialPositionInStream.LATEST)
> .withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1)){code}
>  
> When testing locally I managed to catch this exception. Just before executing 
> this 
> [link|https://github.com/apache/beam/blob/6c93105c2cb7be709c6b3e2e6cdcd09df2b48308/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L97]
>  that threw exception I captured the state of the class so that you can 
> replicate the issue
> {code:java}
> org.apache.beam.sdk.util.MovingFunction@71781a[sampleUpdateMs=5000,numSignificantBuckets=2,numSignificantSamples=10,function=org.apache.beam.sdk.transforms.Min$MinLongFn@7909d8d3,buckets={9223372036854775807,9223372036854775807,1519315344334,1519315343759,1519315343770,1519315344086,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807},numSamples={0,0,1,158,156,146,0,0,0,0,144,0},currentMsSinceEpoch=1519315585000,currentIndex=2]{code}
>  
> the add function of MovingFunction was called with nowMsSinceEpoch = 
> 1519315583591
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to