[
https://issues.apache.org/jira/browse/KAFKA-10151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17138129#comment-17138129
]
Chia-Ping Tsai commented on KAFKA-10151:
----------------------------------------
[~ableegoldman] I feel you have resolved this issue by
https://github.com/apache/kafka/commit/2239004907b29e00811fee9ded5a790172701a03
the root cause is the in-flight records to changelog are not completed when
closing streamTasks.
{code:java}
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
switch (state()) {
case RUNNING:
case RESTORING:
case SUSPENDED:
maybeScheduleCheckpoint(); //checkpoint is not up-to-date if
there are in-flight requests
stateMgr.flush();
recordCollector.flush();
{code}
see
[https://github.com/apache/kafka/blob/03ed08d0d17a10ca4f96c8cc0a8694834ae01e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L345]
The commit
(https://github.com/apache/kafka/commit/2239004907b29e00811fee9ded5a790172701a03)
update the checkpoint after calling recordCollector.flush so the checkpoint is
up-to-date.
Also, I loop
StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi
30 times with
[https://github.com/apache/kafka/commit/2239004907b29e00811fee9ded5a790172701a03].
All pass.
> Flaky Test
> StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi
> --------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-10151
> URL: https://issues.apache.org/jira/browse/KAFKA-10151
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Sophie Blee-Goldman
> Priority: Major
> Labels: flaky-test, integration-test
> Attachments:
> StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi.stdout.rtf
>
>
> I've started seeing this fail in the past week or so. Checked out the logs
> and there's nothing obviously wrong (ie no ERROR or exception) so it might
> just be flaky?
>
> java.lang.AssertionError: Condition not met within timeout 60000. Could not
> get expected result in time. at
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> at
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> at
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) at
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388) at
> org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.verifyCountWithTimestamp(StoreUpgradeIntegrationTest.java:367)
> at
> org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:183)
> at
> org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:109)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)