[ https://issues.apache.org/jira/browse/FLINK-31472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825567#comment-17825567 ]
Ahmed Hamdy commented on FLINK-31472: ------------------------------------- [~lincoln.86xy] [~mapohl] Hello, could you please review the [PR|https://github.com/apache/flink/pull/24481]. Let me add some context h2.Why is the test failing? So the flakiness arises from setting processing time within the test to trigger the timer flush of the writer, This caused the concurrent thread access of the mailbox which caused the failure, The problem was within the test not the AsyncWriter. h2.Why is it intermittent? This is because we are also writing batches of records so there was a race condition between both batch size trigger and timer trigger, in other words we used to add a new batch and a set the time to trigger the flush, had the batch trigger flushed the buffer the timer callback would be discarded safely. h2.Why do I believe this refactor should fix the test? Because I have removed the time setting from the test it self, The size of batches sent should be enough to trigger the flush which is needed for the test. h2.What could go wrong? There should be no newly introduced issues here since the batch size is unchanged we expect enough flushes triggered by batch size only to stabilize the rate limiting value as expected. h2.How did I verify the fix? I have run a sampler till failure for a some time and haven't reported any. I am aware local setup is different than CI but the test should be less sensitive to delays now so I expect we are green to go. > AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread > ---------------------------------------------------------------- > > Key: FLINK-31472 > URL: https://issues.apache.org/jira/browse/FLINK-31472 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 1.17.0, 1.16.1, 1.18.0, 1.19.0, 1.20.0 > Reporter: Ran Tao > Assignee: Ahmed Hamdy > Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.20.0 > > > when run mvn clean test, this case failed occasionally. > {noformat} > [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 > s <<< FAILURE! - in > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest > [ERROR] > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize > Time elapsed: 0.492 s <<< ERROR! > java.lang.IllegalStateException: Illegal thread detected. This method must be > called from inside the mailbox thread! > at > org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262) > at > org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315) > at > org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199) > at > org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76) > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) > at > org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) > at > org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) > at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.lambda$execute$1(JUnitPlatformProvider.java:199) > at java.util.Iterator.forEachRemaining(Iterator.java:116) > at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:193) > at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154) > at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:120) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428) > at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162) > at > org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)