[ https://issues.apache.org/jira/browse/KAFKA-10151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17138129#comment-17138129 ]
Chia-Ping Tsai commented on KAFKA-10151: ---------------------------------------- [~ableegoldman] I feel you have resolved this issue by https://github.com/apache/kafka/commit/2239004907b29e00811fee9ded5a790172701a03 the root cause is the in-flight records to changelog are not completed when closing streamTasks. {code:java} @Override public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { switch (state()) { case RUNNING: case RESTORING: case SUSPENDED: maybeScheduleCheckpoint(); //checkpoint is not up-to-date if there are in-flight requests stateMgr.flush(); recordCollector.flush(); {code} see [https://github.com/apache/kafka/blob/03ed08d0d17a10ca4f96c8cc0a8694834ae01e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L345] The commit (https://github.com/apache/kafka/commit/2239004907b29e00811fee9ded5a790172701a03) update the checkpoint after calling recordCollector.flush so the checkpoint is up-to-date. Also, I loop StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi 30 times with [https://github.com/apache/kafka/commit/2239004907b29e00811fee9ded5a790172701a03]. All pass. > Flaky Test > StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi > -------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-10151 > URL: https://issues.apache.org/jira/browse/KAFKA-10151 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Sophie Blee-Goldman > Priority: Major > Labels: flaky-test, integration-test > Attachments: > StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi.stdout.rtf > > > I've started seeing this fail in the past week or so. Checked out the logs > and there's nothing obviously wrong (ie no ERROR or exception) so it might > just be flaky? > > java.lang.AssertionError: Condition not met within timeout 60000. Could not > get expected result in time. at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388) at > org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.verifyCountWithTimestamp(StoreUpgradeIntegrationTest.java:367) > at > org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:183) > at > org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:109) -- This message was sent by Atlassian Jira (v8.3.4#803005)