[ https://issues.apache.org/jira/browse/KAFKA-9176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17052586#comment-17052586 ]
ASF GitHub Bot commented on KAFKA-9176: --------------------------------------- guozhangwang commented on pull request #8235: KAFKA-9176: Do not update limit offset if we are in RESTORE_ACTIVE mode URL: https://github.com/apache/kafka/pull/8235 Previously we may be updating the standby's limit offset as committed offsets to those source changelogs, and then inside the inner method we check if the state is in RESTORE_ACTIVE or not, which is a bug. We should, instead, just check on the caller that we can skip restoring if: 1) we are in RESTORE_ACTIVE mode. 2) there're no source changelog partitions. 3) those partitions do not have any buffered records. Also updated the unit test for this coverage. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky test failure: > OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore > ------------------------------------------------------------------------------------ > > Key: KAFKA-9176 > URL: https://issues.apache.org/jira/browse/KAFKA-9176 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests > Affects Versions: 2.4.0 > Reporter: Manikumar > Assignee: Guozhang Wang > Priority: Major > Labels: flaky-test > > h4. > [https://builds.apache.org/blue/organizations/jenkins/kafka-2.4-jdk8/detail/kafka-2.4-jdk8/65/tests] > h4. Error > org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state > store source-table because the stream thread is PARTITIONS_ASSIGNED, not > RUNNING > h4. Stacktrace > org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state > store source-table because the stream thread is PARTITIONS_ASSIGNED, not > RUNNING > at > org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:51) > at > org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:59) > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1129) > at > org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore(OptimizedKTableIntegrationTest.java:157) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) > at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164) > at > org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412) > at > org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) > at > org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at > org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.3.4#803005)