[ 
https://issues.apache.org/jira/browse/FLINK-31472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825567#comment-17825567
 ] 

Ahmed Hamdy commented on FLINK-31472:
-------------------------------------

[~lincoln.86xy] [~mapohl] 
Hello, could you please review the 
[PR|https://github.com/apache/flink/pull/24481].
Let me add some context

h2.Why is the test failing?

So the flakiness arises from setting processing time within the test to trigger 
the timer flush of the writer, This caused the concurrent thread access of the 
mailbox which caused the failure, The problem was within the test not the 
AsyncWriter. 

h2.Why is it intermittent?

This is because we are also writing batches of records so there was a race 
condition between both batch size trigger and timer trigger, in other words we 
used to add a new batch and a set the time to trigger the flush, had the batch 
trigger flushed the buffer the timer callback would be discarded safely.
h2.Why do I believe this refactor should fix the test?
Because I have removed the time setting from the test it self, The size of 
batches sent should be enough to trigger the flush which is needed for the test.

h2.What could go wrong?

There should be no newly introduced issues here since the batch size is 
unchanged we expect enough flushes triggered by batch size only to stabilize 
the rate limiting value as expected.

h2.How did I verify the fix?

I have run a sampler till failure for a some time and haven't reported any. I 
am aware local setup is different than CI but the test should be less sensitive 
to delays now so I expect we are green to go.

> AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
> ----------------------------------------------------------------
>
>                 Key: FLINK-31472
>                 URL: https://issues.apache.org/jira/browse/FLINK-31472
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.17.0, 1.16.1, 1.18.0, 1.19.0, 1.20.0
>            Reporter: Ran Tao
>            Assignee: Ahmed Hamdy
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>             Fix For: 1.20.0
>
>
> when run mvn clean test, this case failed occasionally.
> {noformat}
> [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 
> s <<< FAILURE! - in 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest
> [ERROR] 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize
>   Time elapsed: 0.492 s  <<< ERROR!
> java.lang.IllegalStateException: Illegal thread detected. This method must be 
> called from inside the mailbox thread!
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>         at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>         at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>         at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>         at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>         at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>         at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>         at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
>         at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
>         at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
>         at 
> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
>         at 
> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
>         at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.lambda$execute$1(JUnitPlatformProvider.java:199)
>         at java.util.Iterator.forEachRemaining(Iterator.java:116)
>         at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:193)
>         at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
>         at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:120)
>         at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
>         at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
>         at 
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
>         at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to