cameronlee314 commented on a change in pull request #1344: SAMZA-2510:
Incorrect shutdown status due to race between runloop and process callback
thread
URL: https://github.com/apache/samza/pull/1344#discussion_r408445170
##########
File path: samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
##########
@@ -780,4 +782,27 @@ public void
testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedExc
commitLatch.await();
}
+
+ @Test(expected = SamzaException.class)
+ public void testExceptionIsPropagatedAfterShutdown() {
+ SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+ when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+ OffsetManager offsetManager = mock(OffsetManager.class);
+
+ TestTask task0 = new TestTask(false, false, false, null);
+ TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0,
offsetManager, consumerMultiplexer);
+
+ Map<TaskName, TaskInstance> tasks = ImmutableMap.of(taskName0, t0);
+
+ int maxMessagesInFlight = 2;
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
+ callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+ () -> 0L, false);
+ when(consumerMultiplexer.choose(false))
+ .thenReturn(envelope0)
+ .thenReturn(ssp0EndOfStream)
Review comment:
Will this test just trigger the `throwable != null` condition (and not the
`shutdownNow` condition) for exiting the main while loop? It seems like the
`envelope0` gets submitted and then processing starts, so `throwable` could get
set before `ssp0EndOfStream` even gets seen. It seems like it depends on when
the context switch happens.
If it is non-deterministic what the test is actually covering, then maybe
just turn this into a `testExceptionIsPropagated` test (I think this class
might need a test for that anyways).
If you think it's worth maintaining a test for this race, maybe you could
put a latch into `TestTask` processing somewhere, so you could make sure
`shutdownNow` gets set (you could use `RunLoop.shutdown` to set the flag)
before the `throwable` gets set.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services