[ https://issues.apache.org/jira/browse/BEAM-3605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354264#comment-16354264 ]
Alexey Romanenko edited comment on BEAM-3605 at 2/6/18 6:08 PM: ---------------------------------------------------------------- The first observation. This particular test call ShardReadersPool, which handles the Kinesis shard readers, and, in its order, it spawns two threads (through ExecutorService) for two shards (one thread per shard). Then it checks that one shard was closed. So, here, seems, we have a race condition between shard thread and main thread and 200 ms of Thread.sleep() is not enough sometimes to wait until all spawn threads will be finished. Since _sleep()_ never won't be enough, I think that before calling _assert()_ we need to make sure that all spawn threads have finished their job. Perhaps, one of the way to do this is to call _shardReadersPool.stop()_ before _assert()_ because it shutdowns all spawn threads. Any thoughts? was (Author: aromanenko): The first observation. This particular test call ShardReadersPool, which handles the Kinesis shard readers, and, in its order, it spawns two threads (through ExecutorService) for two shards (one thread per shard). Then it checks that one shard was closed. So, here, seems, we have a race condition between shard thread and main thread and 200 ms of Thread.sleep() is not enough sometimes to wait until all spawn threads will be finished. Since _sleep()_ never won't be enough, I think that before calling _assert()_ we need to make sure that all spawn threads finished their job. Perhaps, one of the way is to call _shardReadersPool.stop()_ before _assert()_ because it shutdowns all spawn threads. Any thoughts? > Kinesis ShardReadersPoolTest shouldForgetClosedShardIterator failure > -------------------------------------------------------------------- > > Key: BEAM-3605 > URL: https://issues.apache.org/jira/browse/BEAM-3605 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions > Reporter: Kenneth Knowles > Assignee: Alexey Romanenko > Priority: Critical > Labels: flake, sickbay > > Here's one: > https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/1758/testReport/junit/org.apache.beam.sdk.io.kinesis/ShardReadersPoolTest/shouldForgetClosedShardIterator/ > Filing all test failures as "Critical" so we can sickbay or fix. > The Jenkins build will get GC'd so here is the error: > {code} > java.lang.AssertionError: > Expecting: > <["shard1", "shard2"]> > to contain only: > <["shard2"]> > but the following elements were unexpected: > <["shard1"]> > at > org.apache.beam.sdk.io.kinesis.ShardReadersPoolTest.shouldForgetClosedShardIterator(ShardReadersPoolTest.java:270) > {code} > and stderr > {code} > Feb 01, 2018 11:24:16 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Shard iterator for shard1 shard is closed, finishing the read loop > org.apache.beam.sdk.io.kinesis.KinesisShardClosedException > Feb 01, 2018 11:24:16 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Kinesis Shard read loop has finished > Feb 01, 2018 11:24:16 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Shard iterator for shard1 shard is closed, finishing the read loop > org.apache.beam.sdk.io.kinesis.KinesisShardClosedException > Feb 01, 2018 11:24:16 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Kinesis Shard read loop has finished > Feb 01, 2018 11:24:19 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Shard iterator for shard1 shard is closed, finishing the read loop > org.apache.beam.sdk.io.kinesis.KinesisShardClosedException > Feb 01, 2018 11:24:19 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Kinesis Shard read loop has finished > Feb 01, 2018 11:24:23 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Shard iterator for shard1 shard is closed, finishing the read loop > org.apache.beam.sdk.io.kinesis.KinesisShardClosedException: Shard iterator > reached end of the shard: streamName=null, shardId=shard1 > at > org.apache.beam.sdk.io.kinesis.ShardRecordsIterator.readNextBatch(ShardRecordsIterator.java:70) > at > org.apache.beam.sdk.io.kinesis.ShardReadersPool.readLoop(ShardReadersPool.java:121) > at > org.apache.beam.sdk.io.kinesis.ShardReadersPool.lambda$startReadingShards$0(ShardReadersPool.java:112) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Feb 01, 2018 11:24:23 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Kinesis Shard read loop has finished > Feb 01, 2018 11:24:23 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Shard iterator for shard2 shard is closed, finishing the read loop > org.apache.beam.sdk.io.kinesis.KinesisShardClosedException: Shard iterator > reached end of the shard: streamName=null, shardId=shard2 > at > org.apache.beam.sdk.io.kinesis.ShardRecordsIterator.readNextBatch(ShardRecordsIterator.java:70) > at > org.apache.beam.sdk.io.kinesis.ShardReadersPool.readLoop(ShardReadersPool.java:121) > at > org.apache.beam.sdk.io.kinesis.ShardReadersPool.lambda$startReadingShards$0(ShardReadersPool.java:112) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Feb 01, 2018 11:24:23 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool stop > INFO: Closing shard iterators pool > Feb 01, 2018 11:24:24 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Kinesis Shard read loop has finished > Feb 01, 2018 11:24:24 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Shard iterator for shard1 shard is closed, finishing the read loop > org.apache.beam.sdk.io.kinesis.KinesisShardClosedException > Feb 01, 2018 11:24:24 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > WARNING: Transient exception occurred. > org.apache.beam.sdk.io.kinesis.TransientKinesisException > Feb 01, 2018 11:24:24 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Shard iterator for shard1 shard is closed, finishing the read loop > org.apache.beam.sdk.io.kinesis.KinesisShardClosedException > Feb 01, 2018 11:24:24 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Kinesis Shard read loop has finished > Feb 01, 2018 11:24:24 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Shard iterator for shard1 shard is closed, finishing the read loop > org.apache.beam.sdk.io.kinesis.KinesisShardClosedException > Feb 01, 2018 11:24:24 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Kinesis Shard read loop has finished > Feb 01, 2018 11:24:26 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool stop > INFO: Closing shard iterators pool > Feb 01, 2018 11:24:26 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > WARNING: Thread was interrupted, finishing the read loop > java.lang.InterruptedException: sleep interrupted > at java.lang.Thread.sleep(Native Method) > at > org.apache.beam.sdk.io.kinesis.ShardReadersPoolTest.lambda$shouldInterruptKinesisReadingAndStopShortly$0(ShardReadersPoolTest.java:150) > at > org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:34) > at > org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:91) > at > org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29) > at > org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:38) > at > org.mockito.internal.creation.MethodInterceptorFilter.intercept(MethodInterceptorFilter.java:51) > at > org.apache.beam.sdk.io.kinesis.ShardRecordsIterator$$EnhancerByMockitoWithCGLIB$$791f33eb.readNextBatch(<generated>) > at > org.apache.beam.sdk.io.kinesis.ShardReadersPool.readLoop(ShardReadersPool.java:121) > at > org.apache.beam.sdk.io.kinesis.ShardReadersPool.lambda$startReadingShards$0(ShardReadersPool.java:112) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Feb 01, 2018 11:24:26 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Kinesis Shard read loop has finished > Feb 01, 2018 11:24:26 PM org.apache.beam.sdk.io.kinesis.ShardReadersPool > readLoop > INFO: Kinesis Shard read loop has finished > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)