[GitHub] [kafka] ex172000 commented on a diff in pull request #14110: MINOR: Add test for describe topic with ID
ex172000 commented on code in PR #14110: URL: https://github.com/apache/kafka/pull/14110#discussion_r1288017392 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2354,6 +2354,75 @@ public void testDeleteRecords() throws Exception { } } +@Test +public void testDescribeTopicsByIds() { +try (AdminClientUnitTestEnv env = mockClientEnv()) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + +// Valid ID +Uuid topicId = Uuid.randomUuid(); +String topicName = "test-topic"; +Node leader = env.cluster().nodes().get(0); +MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata( +Errors.NONE, +new TopicPartition(topicName, 0), +Optional.of(leader.id()), +Optional.of(10), +singletonList(leader.id()), +singletonList(leader.id()), +singletonList(leader.id())); +env.kafkaClient().prepareResponse(RequestTestUtils +.metadataResponse( +env.cluster().nodes(), +env.cluster().clusterResource().clusterId(), +env.cluster().controller().id(), +singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topicName, topicId, false, +singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED; +TopicCollection.TopicIdCollection topicIds = TopicCollection.ofTopicIds( +singletonList(topicId)); +try { +DescribeTopicsResult result = env.adminClient().describeTopics(topicIds); +Map allTopicIds = result.allTopicIds().get(); +assertEquals(topicName, allTopicIds.get(topicId).name()); +} catch (Exception e) { +fail("describe with valid topicId should not fail", e); +} + +// ID not exist in brokers +Uuid nonExistID = Uuid.randomUuid(); +env.kafkaClient().prepareResponse(RequestTestUtils +.metadataResponse( +env.cluster().nodes(), +env.cluster().clusterResource().clusterId(), +env.cluster().controller().id(), +asList())); +try { +DescribeTopicsResult result = env.adminClient().describeTopics( +TopicCollection.ofTopicIds(singletonList(nonExistID))); +TestUtils.assertFutureError(result.allTopicIds(), InvalidTopicException.class); +result.allTopicIds().get(); +fail("describe with non-exist topic ID should throw exception"); +} catch (Exception e) { +assertEquals( + String.format("org.apache.kafka.common.errors.InvalidTopicException: TopicId %s not found.", nonExistID), Review Comment: Thanks for pointing out, I can make a follow up change looking into this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
fvaleri commented on code in PR #13562: URL: https://github.com/apache/kafka/pull/13562#discussion_r1287993847 ## core/src/main/scala/kafka/tools/GetOffsetShell.scala: ## Review Comment: > We should still keep this file can forward all args to new GetOffsetShell, see `FeatureCommand.scala`. Hi @dengziming, this is tool was reported in the "missing wrapper script" category in [KIP-906](https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines), but that was only because some system tests are depending on the FQCN. In this PR we are also changing that, and they are now using the wrapper script instead, so I think there is no need for the redirection. ## core/src/main/scala/kafka/tools/GetOffsetShell.scala: ## Review Comment: Hi @dengziming, this is tool was reported in the "missing wrapper script" category in [KIP-906](https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines), but that was only because some system tests are depending on the FQCN. In this PR we are also changing that, and they are now using the wrapper script instead, so I think there is no need for the redirection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15324) Do not bump the leader epoch when changing the replica set
[ https://issues.apache.org/jira/browse/KAFKA-15324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zeyuliu updated KAFKA-15324: Description: The KRaft controller increases the leader epoch when a partition replica set shrink. This is not strictly required and should be removed. Related changes: https://github.com/apache/kafka/pull/13765/files was:The KRaft controller increases the leader epoch when a partition replica set shrink. This is not strictly required and should be removed. > Do not bump the leader epoch when changing the replica set > -- > > Key: KAFKA-15324 > URL: https://issues.apache.org/jira/browse/KAFKA-15324 > Project: Kafka > Issue Type: Task >Reporter: zeyuliu >Priority: Major > > The KRaft controller increases the leader epoch when a partition replica set > shrink. This is not strictly required and should be removed. > Related changes: https://github.com/apache/kafka/pull/13765/files -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15324) Do not bump the leader epoch when changing the replica set
zeyuliu created KAFKA-15324: --- Summary: Do not bump the leader epoch when changing the replica set Key: KAFKA-15324 URL: https://issues.apache.org/jira/browse/KAFKA-15324 Project: Kafka Issue Type: Task Reporter: zeyuliu The KRaft controller increases the leader epoch when a partition replica set shrink. This is not strictly required and should be removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe closed pull request #12662: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe closed pull request #12662: KAFKA-14214: Convert StandardAuthorizer to copy-on-write URL: https://github.com/apache/kafka/pull/12662 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12662: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
cmccabe commented on PR #12662: URL: https://github.com/apache/kafka/pull/12662#issuecomment-1670700174 > @cmccabe I guess this can be closed now since it is superseded by https://github.com/apache/kafka/pull/13437 ? yes. thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #12531: Add new metadata loader and publisher [WIP]
cmccabe closed pull request #12531: Add new metadata loader and publisher [WIP] URL: https://github.com/apache/kafka/pull/12531 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12531: Add new metadata loader and publisher [WIP]
cmccabe commented on PR #12531: URL: https://github.com/apache/kafka/pull/12531#issuecomment-1670699324 This is very stale at this point. Basically it was replaced by #12983, #13337 and some other changes in that vein -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] github-actions[bot] commented on pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
github-actions[bot] commented on PR #13656: URL: https://github.com/apache/kafka/pull/13656#issuecomment-1670601383 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
dengziming commented on code in PR #13562: URL: https://github.com/apache/kafka/pull/13562#discussion_r1287886679 ## core/src/main/scala/kafka/tools/GetOffsetShell.scala: ## Review Comment: We should still keep this file can forward all args to new GetOffsetShell, see `FeatureCommand.scala`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #14110: MINOR: Add test for describe topic with ID
dengziming commented on code in PR #14110: URL: https://github.com/apache/kafka/pull/14110#discussion_r1287875933 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2354,6 +2354,75 @@ public void testDeleteRecords() throws Exception { } } +@Test +public void testDescribeTopicsByIds() { +try (AdminClientUnitTestEnv env = mockClientEnv()) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + +// Valid ID +Uuid topicId = Uuid.randomUuid(); +String topicName = "test-topic"; +Node leader = env.cluster().nodes().get(0); +MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata( +Errors.NONE, +new TopicPartition(topicName, 0), +Optional.of(leader.id()), +Optional.of(10), +singletonList(leader.id()), +singletonList(leader.id()), +singletonList(leader.id())); +env.kafkaClient().prepareResponse(RequestTestUtils +.metadataResponse( +env.cluster().nodes(), +env.cluster().clusterResource().clusterId(), +env.cluster().controller().id(), +singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topicName, topicId, false, +singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED; +TopicCollection.TopicIdCollection topicIds = TopicCollection.ofTopicIds( +singletonList(topicId)); +try { +DescribeTopicsResult result = env.adminClient().describeTopics(topicIds); +Map allTopicIds = result.allTopicIds().get(); +assertEquals(topicName, allTopicIds.get(topicId).name()); +} catch (Exception e) { +fail("describe with valid topicId should not fail", e); +} + +// ID not exist in brokers +Uuid nonExistID = Uuid.randomUuid(); +env.kafkaClient().prepareResponse(RequestTestUtils +.metadataResponse( +env.cluster().nodes(), +env.cluster().clusterResource().clusterId(), +env.cluster().controller().id(), +asList())); Review Comment: It's better to use emptyList() ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2354,6 +2354,75 @@ public void testDeleteRecords() throws Exception { } } +@Test +public void testDescribeTopicsByIds() { +try (AdminClientUnitTestEnv env = mockClientEnv()) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + +// Valid ID +Uuid topicId = Uuid.randomUuid(); +String topicName = "test-topic"; +Node leader = env.cluster().nodes().get(0); +MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata( +Errors.NONE, +new TopicPartition(topicName, 0), +Optional.of(leader.id()), +Optional.of(10), +singletonList(leader.id()), +singletonList(leader.id()), +singletonList(leader.id())); +env.kafkaClient().prepareResponse(RequestTestUtils +.metadataResponse( +env.cluster().nodes(), +env.cluster().clusterResource().clusterId(), +env.cluster().controller().id(), +singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topicName, topicId, false, +singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED; +TopicCollection.TopicIdCollection topicIds = TopicCollection.ofTopicIds( +singletonList(topicId)); +try { +DescribeTopicsResult result = env.adminClient().describeTopics(topicIds); +Map allTopicIds = result.allTopicIds().get(); +assertEquals(topicName, allTopicIds.get(topicId).name()); +} catch (Exception e) { +fail("describe with valid topicId should not fail", e); +} + +// ID not exist in brokers +Uuid nonExistID = Uuid.randomUuid(); +env.kafkaClient().prepareResponse(RequestTestUtils +.metadataResponse( +env.cluster().nodes(), +env.cluster().clusterResource().clusterId
[jira] [Assigned] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi reassigned KAFKA-14912: -- Assignee: hudeqi > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: hudeqi >Priority: Major > > Context: We need to make the 1024 value here [1] as dynamically configurable > [1] > https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752229#comment-17752229 ] hudeqi commented on KAFKA-14912: ok, i'll take a look in the last few days, thx > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Priority: Major > > Context: We need to make the 1024 value here [1] as dynamically configurable > [1] > https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434
[ https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maruthi updated KAFKA-15319: Description: Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12 Upgrade to 1.2.13 to fix https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c was: Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12 Upgrade to 1.2.13 to fix > Upgrade rocksdb to fix CVE-2022-37434 > - > > Key: KAFKA-15319 > URL: https://issues.apache.org/jira/browse/KAFKA-15319 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.1 >Reporter: Maruthi >Priority: Major > > Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12 > Upgrade to 1.2.13 to fix > https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on a diff in pull request #14141: KAFKA-15100; KRaft data race with the expiration service
jsancio commented on code in PR #14141: URL: https://github.com/apache/kafka/pull/14141#discussion_r1287811134 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1039,6 +1054,14 @@ private FetchResponseData tryCompleteFetchRequest( } } +private static boolean isPartitionTruncated(FetchResponseData.PartitionData partitionResponseData) { +FetchResponseData.EpochEndOffset divergingEpoch = partitionResponseData.divergingEpoch(); +FetchResponseData.SnapshotId snapshotId = partitionResponseData.snapshotId(); + +return divergingEpoch.epoch() != -1 || divergingEpoch.endOffset() != -1 || Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15322) Possible thread leaks in AbstractCoordinatorTest
[ https://issues.apache.org/jira/browse/KAFKA-15322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15322: -- Description: Relevant stack trace for {{{}AbstractCoordinatorTest{}}}: {noformat} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {noformat} was: Relevant Logs for AbstractCoordinatorTest - {noformat} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {noformat} > Possible thread leaks in AbstractCoordinatorTest > > > Key: KAFKA-15322 > URL: https://issues.apache.org/jira/browse/KAFKA-15322 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > Relevant stack trace for {{{}AbstractCoordinatorTest{}}}: > {noformat} > Relevant Logs for AbstractCoordinatorTest - > Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group > java.base@17.0.7/java.lang.Object.wait(Native Method) > java.base@17.0.7/java.lang.Object.wait(Object.java:338) > > app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15323) Possible thread leaks in ListOffsetsHandlerTest
[ https://issues.apache.org/jira/browse/KAFKA-15323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15323: -- Description: Relevant stack trace for {{{}ListOffsetsHandlerTest{}}}: {noformat} Thread: 30 - kafka-admin-client-thread | adminclient-2 app//org.apache.log4j.Category.getEffectiveLevel(Category.java:430) app//org.apache.log4j.Logger.isTraceEnabled(Logger.java:203) app//org.slf4j.impl.Reload4jLoggerAdapter.isTraceEnabled(Reload4jLoggerAdapter.java:83) app//org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger.trace(LogContext.java:185) app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.maybeDrainPendingCalls(KafkaAdminClient.java:1233) app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1543) app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1497) java.base@17.0.7/java.lang.Thread.run(Thread.java:833) app//org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64) {noformat} was: Relevant Logs for AbstractCoordinatorTest - {noformat} Thread: 30 - kafka-admin-client-thread | adminclient-2 app//org.apache.log4j.Category.getEffectiveLevel(Category.java:430) app//org.apache.log4j.Logger.isTraceEnabled(Logger.java:203) app//org.slf4j.impl.Reload4jLoggerAdapter.isTraceEnabled(Reload4jLoggerAdapter.java:83) app//org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger.trace(LogContext.java:185) app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.maybeDrainPendingCalls(KafkaAdminClient.java:1233) app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1543) app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1497) java.base@17.0.7/java.lang.Thread.run(Thread.java:833) app//org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64) {noformat} > Possible thread leaks in ListOffsetsHandlerTest > --- > > Key: KAFKA-15323 > URL: https://issues.apache.org/jira/browse/KAFKA-15323 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > Relevant stack trace for {{{}ListOffsetsHandlerTest{}}}: > {noformat} > Thread: 30 - kafka-admin-client-thread | adminclient-2 > app//org.apache.log4j.Category.getEffectiveLevel(Category.java:430) > app//org.apache.log4j.Logger.isTraceEnabled(Logger.java:203) > > app//org.slf4j.impl.Reload4jLoggerAdapter.isTraceEnabled(Reload4jLoggerAdapter.java:83) > > app//org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger.trace(LogContext.java:185) > > app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.maybeDrainPendingCalls(KafkaAdminClient.java:1233) > > app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1543) > > app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1497) > java.base@17.0.7/java.lang.Thread.run(Thread.java:833) > app//org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15323) Possible thread leaks in ListOffsetsHandlerTest
[ https://issues.apache.org/jira/browse/KAFKA-15323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15323: -- Description: Relevant Logs for AbstractCoordinatorTest - {noformat} Thread: 30 - kafka-admin-client-thread | adminclient-2 app//org.apache.log4j.Category.getEffectiveLevel(Category.java:430) app//org.apache.log4j.Logger.isTraceEnabled(Logger.java:203) app//org.slf4j.impl.Reload4jLoggerAdapter.isTraceEnabled(Reload4jLoggerAdapter.java:83) app//org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger.trace(LogContext.java:185) app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.maybeDrainPendingCalls(KafkaAdminClient.java:1233) app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1543) app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1497) java.base@17.0.7/java.lang.Thread.run(Thread.java:833) app//org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64) {noformat} was: Relevant Logs for AbstractCoordinatorTest - {noformat} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {noformat} > Possible thread leaks in ListOffsetsHandlerTest > --- > > Key: KAFKA-15323 > URL: https://issues.apache.org/jira/browse/KAFKA-15323 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > Relevant Logs for AbstractCoordinatorTest - > {noformat} > Thread: 30 - kafka-admin-client-thread | adminclient-2 > app//org.apache.log4j.Category.getEffectiveLevel(Category.java:430) > app//org.apache.log4j.Logger.isTraceEnabled(Logger.java:203) > > app//org.slf4j.impl.Reload4jLoggerAdapter.isTraceEnabled(Reload4jLoggerAdapter.java:83) > > app//org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger.trace(LogContext.java:185) > > app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.maybeDrainPendingCalls(KafkaAdminClient.java:1233) > > app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1543) > > app//org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1497) > java.base@17.0.7/java.lang.Thread.run(Thread.java:833) > app//org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:64) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15322) Possible thread leaks in AbstractCoordinatorTest
[ https://issues.apache.org/jira/browse/KAFKA-15322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15322: -- Description: Relevant Logs for AbstractCoordinatorTest - {noformat} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {noformat} was: Relevant Logs for AbstractCoordinatorTest - {code} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {code} > Possible thread leaks in AbstractCoordinatorTest > > > Key: KAFKA-15322 > URL: https://issues.apache.org/jira/browse/KAFKA-15322 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > Relevant Logs for AbstractCoordinatorTest - > {noformat} > Relevant Logs for AbstractCoordinatorTest - > Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group > java.base@17.0.7/java.lang.Object.wait(Native Method) > java.base@17.0.7/java.lang.Object.wait(Object.java:338) > > app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15323) Possible thread leaks in ListOffsetsHandlerTest
Kirk True created KAFKA-15323: - Summary: Possible thread leaks in ListOffsetsHandlerTest Key: KAFKA-15323 URL: https://issues.apache.org/jira/browse/KAFKA-15323 Project: Kafka Issue Type: Bug Components: clients, consumer, unit tests Reporter: Kirk True Assignee: Kirk True Relevant Logs for AbstractCoordinatorTest - {noformat} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15322) Possible thread leaks in AbstractCoordinatorTest
[ https://issues.apache.org/jira/browse/KAFKA-15322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15322: -- Description: Relevant Logs for AbstractCoordinatorTest - {code} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {code} was: Relevant Logs for AbstractCoordinatorTest - {code} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {code} > Possible thread leaks in AbstractCoordinatorTest > > > Key: KAFKA-15322 > URL: https://issues.apache.org/jira/browse/KAFKA-15322 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > Relevant Logs for AbstractCoordinatorTest - > {code} > Relevant Logs for AbstractCoordinatorTest - > Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group > java.base@17.0.7/java.lang.Object.wait(Native Method) > java.base@17.0.7/java.lang.Object.wait(Object.java:338) > > app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15322) Possible thread leaks in AbstractCoordinatorTest
[ https://issues.apache.org/jira/browse/KAFKA-15322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15322: -- Description: Relevant Logs for AbstractCoordinatorTest - {code} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {code} was: Relevant Logs for AbstractCoordinatorTest - {code:java} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {code} > Possible thread leaks in AbstractCoordinatorTest > > > Key: KAFKA-15322 > URL: https://issues.apache.org/jira/browse/KAFKA-15322 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > Relevant Logs for AbstractCoordinatorTest - > > {code} > Relevant Logs for AbstractCoordinatorTest - > Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group > java.base@17.0.7/java.lang.Object.wait(Native Method) > java.base@17.0.7/java.lang.Object.wait(Object.java:338) > > app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15322) Possible thread leaks in AbstractCoordinatorTest
Kirk True created KAFKA-15322: - Summary: Possible thread leaks in AbstractCoordinatorTest Key: KAFKA-15322 URL: https://issues.apache.org/jira/browse/KAFKA-15322 Project: Kafka Issue Type: Bug Components: clients, consumer, unit tests Reporter: Kirk True Assignee: Kirk True Relevant Logs for AbstractCoordinatorTest - {code:java} Relevant Logs for AbstractCoordinatorTest - Thread: 284 - kafka-coordinator-heartbeat-thread | dummy-group java.base@17.0.7/java.lang.Object.wait(Native Method) java.base@17.0.7/java.lang.Object.wait(Object.java:338) app//org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1448) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14937) Refactoring for client code to reduce boilerplate
[ https://issues.apache.org/jira/browse/KAFKA-14937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752212#comment-17752212 ] Kirk True commented on KAFKA-14937: --- Reviewing the following PRs for flaky tests: * [https://github.com/apache/kafka/pull/13990] * [https://github.com/apache/kafka/pull/14118] * [https://github.com/apache/kafka/pull/14123] > Refactoring for client code to reduce boilerplate > - > > Key: KAFKA-14937 > URL: https://issues.apache.org/jira/browse/KAFKA-14937 > Project: Kafka > Issue Type: Improvement > Components: admin, clients, consumer, producer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.6.0 > > > There are a number of places in the client code where the same basic calls > are made by more than one client implementation. Minor refactoring will > reduce the amount of boilerplate code necessary for the client to construct > its internal state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move
[ https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio resolved KAFKA-15312. Resolution: Fixed > FileRawSnapshotWriter must flush before atomic move > --- > > Key: KAFKA-15312 > URL: https://issues.apache.org/jira/browse/KAFKA-15312 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2 > > > On ext4 file systems it is possible for KRaft to create zero-length snapshot > files. Not all file system fsync to disk on close. For KRaft to guarantee > that the data has made it to disk before calling rename, it needs to make > sure that the file has been fsync. > We have seen cases were the snapshot file has zero-length data on ext4 file > system. > {quote} "Delayed allocation" means that the filesystem tries to delay the > allocation of physical disk blocks for written data for as long as possible. > This policy brings some important performance benefits. Many files are > short-lived; delayed allocation can keep the system from writing fleeting > temporary files to disk at all. And, for longer-lived files, delayed > allocation allows the kernel to accumulate more data and to allocate the > blocks for data contiguously, speeding up both the write and any subsequent > reads of that data. It's an important optimization which is found in most > contemporary filesystems. > But, if blocks have not been allocated for a file, there is no need to write > them quickly as a security measure. Since the blocks do not yet exist, it is > not possible to read somebody else's data from them. So ext4 will not > (cannot) write out unallocated blocks as part of the next journal commit > cycle. Those blocks will, instead, wait until the kernel decides to flush > them out; at that point, physical blocks will be allocated on disk and the > data will be made persistent. The kernel doesn't like to let file data sit > unwritten for too long, but it can still take a minute or so (with the > default settings) for that data to be flushed - far longer than the five > seconds normally seen with ext3. And that is why a crash can cause the loss > of quite a bit more data when ext4 is being used. > {quote} > from: [https://lwn.net/Articles/322823/] > {quote}auto_da_alloc ( * ), noauto_da_alloc > Many broken applications don't use fsync() when replacing existing files via > patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ > rename("foo.new", "foo"), or worse yet, fd = open("foo", > O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will > detect the replace-via-rename and replace-via-truncate patterns and force > that any delayed allocation blocks are allocated such that at the next > journal commit, in the default data=ordered mode, the data blocks of the new > file are forced to disk before the rename() operation is committed. This > provides roughly the same level of guarantees as ext3, and avoids the > "zero-length" problem that can happen when a system crashes before the > delayed allocation blocks are forced to disk. > {quote} > from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14937) Refactoring for client code to reduce boilerplate
[ https://issues.apache.org/jira/browse/KAFKA-14937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752211#comment-17752211 ] Kirk True commented on KAFKA-14937: --- Having lots of issues getting a clean build. I filed INFRA-24874 to see if they could help. > Refactoring for client code to reduce boilerplate > - > > Key: KAFKA-14937 > URL: https://issues.apache.org/jira/browse/KAFKA-14937 > Project: Kafka > Issue Type: Improvement > Components: admin, clients, consumer, producer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.6.0 > > > There are a number of places in the client code where the same basic calls > are made by more than one client implementation. Minor refactoring will > reduce the amount of boilerplate code necessary for the client to construct > its internal state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on a diff in pull request #14141: KAFKA-15100; KRaft data race with the expiration service
jsancio commented on code in PR #14141: URL: https://github.com/apache/kafka/pull/14141#discussion_r1287805598 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -984,11 +991,16 @@ private CompletableFuture handleFetchRequest( Throwable cause = exception instanceof ExecutionException ? exception.getCause() : exception; -// If the fetch timed out in purgatory, it means no new data is available, -// and we will complete the fetch successfully. Otherwise, if there was -// any other error, we need to return it. Errors error = Errors.forException(cause); -if (error != Errors.REQUEST_TIMED_OUT) { +if (error == Errors.REQUEST_TIMED_OUT) { +// Note that for this case the calling thread is the expiration service thread and not the +// polling thread. +// +// If the fetch request timed out in purgatory, it means no new data is available, +// just return the original fetch response. +return response; Review Comment: Yes. That is what I would like to do in the long-term. In the long-term I plan to have a more generic event executor that would allow us to keep the current concurrency model, remove the expiration service thread, and keep all our deterministic unittest and simulation tests. It is a bigger change and I didn't want to block this fix on that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15321) Document consumer group member state machine
Kirk True created KAFKA-15321: - Summary: Document consumer group member state machine Key: KAFKA-15321 URL: https://issues.apache.org/jira/browse/KAFKA-15321 Project: Kafka Issue Type: Sub-task Reporter: Kirk True Assignee: Kirk True We need to first document the new consumer group member state machine. What are the different states and what are the transitions? See [~pnee]'s notes: [https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design] *_Don’t forget to include diagrams for clarity!_* This should be documented on the AK wiki. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434
[ https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maruthi updated KAFKA-15319: Component/s: (was: streams) > Upgrade rocksdb to fix CVE-2022-37434 > - > > Key: KAFKA-15319 > URL: https://issues.apache.org/jira/browse/KAFKA-15319 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.1 >Reporter: Maruthi >Priority: Major > > Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12 > Upgrade to 1.2.13 to fix -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15320) Document event queueing patterns
Kirk True created KAFKA-15320: - Summary: Document event queueing patterns Key: KAFKA-15320 URL: https://issues.apache.org/jira/browse/KAFKA-15320 Project: Kafka Issue Type: Sub-task Reporter: Kirk True Assignee: Kirk True We need to first document the event enqueuing patterns in the PrototypeAsyncConsumer. As part of this task, determine if it’s necessary/beneficial to _conditionally_ add events and/or coalesce any duplicate events in the queue. _Don’t forget to include diagrams for clarity!_ This should be documented on the AK wiki. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move
[ https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-15312: --- Fix Version/s: 3.4.2 > FileRawSnapshotWriter must flush before atomic move > --- > > Key: KAFKA-15312 > URL: https://issues.apache.org/jira/browse/KAFKA-15312 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.6.0, 3.4.2, 3.5.2 > > > On ext4 file systems it is possible for KRaft to create zero-length snapshot > files. Not all file system fsync to disk on close. For KRaft to guarantee > that the data has made it to disk before calling rename, it needs to make > sure that the file has been fsync. > We have seen cases were the snapshot file has zero-length data on ext4 file > system. > {quote} "Delayed allocation" means that the filesystem tries to delay the > allocation of physical disk blocks for written data for as long as possible. > This policy brings some important performance benefits. Many files are > short-lived; delayed allocation can keep the system from writing fleeting > temporary files to disk at all. And, for longer-lived files, delayed > allocation allows the kernel to accumulate more data and to allocate the > blocks for data contiguously, speeding up both the write and any subsequent > reads of that data. It's an important optimization which is found in most > contemporary filesystems. > But, if blocks have not been allocated for a file, there is no need to write > them quickly as a security measure. Since the blocks do not yet exist, it is > not possible to read somebody else's data from them. So ext4 will not > (cannot) write out unallocated blocks as part of the next journal commit > cycle. Those blocks will, instead, wait until the kernel decides to flush > them out; at that point, physical blocks will be allocated on disk and the > data will be made persistent. The kernel doesn't like to let file data sit > unwritten for too long, but it can still take a minute or so (with the > default settings) for that data to be flushed - far longer than the five > seconds normally seen with ext3. And that is why a crash can cause the loss > of quite a bit more data when ext4 is being used. > {quote} > from: [https://lwn.net/Articles/322823/] > {quote}auto_da_alloc ( * ), noauto_da_alloc > Many broken applications don't use fsync() when replacing existing files via > patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ > rename("foo.new", "foo"), or worse yet, fd = open("foo", > O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will > detect the replace-via-rename and replace-via-truncate patterns and force > that any delayed allocation blocks are allocated such that at the next > journal commit, in the default data=ordered mode, the data blocks of the new > file are forced to disk before the rename() operation is committed. This > provides roughly the same level of guarantees as ext3, and avoids the > "zero-length" problem that can happen when a system crashes before the > delayed allocation blocks are forced to disk. > {quote} > from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move
[ https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-15312: --- Fix Version/s: 3.3.3 > FileRawSnapshotWriter must flush before atomic move > --- > > Key: KAFKA-15312 > URL: https://issues.apache.org/jira/browse/KAFKA-15312 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2 > > > On ext4 file systems it is possible for KRaft to create zero-length snapshot > files. Not all file system fsync to disk on close. For KRaft to guarantee > that the data has made it to disk before calling rename, it needs to make > sure that the file has been fsync. > We have seen cases were the snapshot file has zero-length data on ext4 file > system. > {quote} "Delayed allocation" means that the filesystem tries to delay the > allocation of physical disk blocks for written data for as long as possible. > This policy brings some important performance benefits. Many files are > short-lived; delayed allocation can keep the system from writing fleeting > temporary files to disk at all. And, for longer-lived files, delayed > allocation allows the kernel to accumulate more data and to allocate the > blocks for data contiguously, speeding up both the write and any subsequent > reads of that data. It's an important optimization which is found in most > contemporary filesystems. > But, if blocks have not been allocated for a file, there is no need to write > them quickly as a security measure. Since the blocks do not yet exist, it is > not possible to read somebody else's data from them. So ext4 will not > (cannot) write out unallocated blocks as part of the next journal commit > cycle. Those blocks will, instead, wait until the kernel decides to flush > them out; at that point, physical blocks will be allocated on disk and the > data will be made persistent. The kernel doesn't like to let file data sit > unwritten for too long, but it can still take a minute or so (with the > default settings) for that data to be flushed - far longer than the five > seconds normally seen with ext3. And that is why a crash can cause the loss > of quite a bit more data when ext4 is being used. > {quote} > from: [https://lwn.net/Articles/322823/] > {quote}auto_da_alloc ( * ), noauto_da_alloc > Many broken applications don't use fsync() when replacing existing files via > patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ > rename("foo.new", "foo"), or worse yet, fd = open("foo", > O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will > detect the replace-via-rename and replace-via-truncate patterns and force > that any delayed allocation blocks are allocated such that at the next > journal commit, in the default data=ordered mode, the data blocks of the new > file are forced to disk before the rename() operation is committed. This > provides roughly the same level of guarantees as ext3, and avoids the > "zero-length" problem that can happen when a system crashes before the > delayed allocation blocks are forced to disk. > {quote} > from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434
[ https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maruthi updated KAFKA-15319: Component/s: streams > Upgrade rocksdb to fix CVE-2022-37434 > - > > Key: KAFKA-15319 > URL: https://issues.apache.org/jira/browse/KAFKA-15319 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.1 >Reporter: Maruthi >Priority: Major > > Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12 > Upgrade to 1.2.13 to fix -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434
Maruthi created KAFKA-15319: --- Summary: Upgrade rocksdb to fix CVE-2022-37434 Key: KAFKA-15319 URL: https://issues.apache.org/jira/browse/KAFKA-15319 Project: Kafka Issue Type: Bug Affects Versions: 3.4.1 Reporter: Maruthi Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12 Upgrade to 1.2.13 to fix -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #14141: KAFKA-15100; KRaft data race with the expiration service
hachikuji commented on code in PR #14141: URL: https://github.com/apache/kafka/pull/14141#discussion_r1287800658 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -984,11 +991,16 @@ private CompletableFuture handleFetchRequest( Throwable cause = exception instanceof ExecutionException ? exception.getCause() : exception; -// If the fetch timed out in purgatory, it means no new data is available, -// and we will complete the fetch successfully. Otherwise, if there was -// any other error, we need to return it. Errors error = Errors.forException(cause); -if (error != Errors.REQUEST_TIMED_OUT) { +if (error == Errors.REQUEST_TIMED_OUT) { +// Note that for this case the calling thread is the expiration service thread and not the +// polling thread. +// +// If the fetch request timed out in purgatory, it means no new data is available, +// just return the original fetch response. +return response; Review Comment: This is probably the simplest fix for now. An alternative might be to use the callback to put an event on the queue that could be handled in the event loop. ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1039,6 +1054,14 @@ private FetchResponseData tryCompleteFetchRequest( } } +private static boolean isPartitionTruncated(FetchResponseData.PartitionData partitionResponseData) { +FetchResponseData.EpochEndOffset divergingEpoch = partitionResponseData.divergingEpoch(); +FetchResponseData.SnapshotId snapshotId = partitionResponseData.snapshotId(); + +return divergingEpoch.epoch() != -1 || divergingEpoch.endOffset() != -1 || Review Comment: nit: maybe it's just me, but it seems more intuitive to separate the diverging epoch and snapshot id checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13503: MINOR: Refactor TierStateMachine related tests into a separate test file
junrao commented on code in PR #13503: URL: https://github.com/apache/kafka/pull/13503#discussion_r1287779224 ## core/src/test/scala/unit/kafka/server/FetcherThreadTestUtils.scala: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.server Review Comment: unit.kafka.server => kafka.server ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15224) Automate version change to snapshot
[ https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752203#comment-17752203 ] Tanay Karmarkar commented on KAFKA-15224: - [~divijvaidya] can we include new python packages? Do you know where do you currently manage python package requirements ? I can't seem to find the requirements.txt > Automate version change to snapshot > > > Key: KAFKA-15224 > URL: https://issues.apache.org/jira/browse/KAFKA-15224 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Tanay Karmarkar >Priority: Minor > > We require changing to SNAPSHOT version as part of the release process [1]. > The specific manual steps are: > Update version on the branch to 0.10.0.1-SNAPSHOT in the following places: > * > ** docs/js/templateData.js > ** gradle.properties > ** kafka-merge-pr.py > ** streams/quickstart/java/pom.xml > ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml > ** streams/quickstart/pom.xml > ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the > -SNAPSHOT convention due to python version naming restrictions, instead > update it to 0.10.0.1.dev0) > ** tests/kafkatest/version.py > The diff of the changes look like > [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9] > > > It would be nice if we could run a script to automatically do it. Note that > release.py (line 550) already does something similar where it replaces > SNAPSHOT with actual version. We need to do the opposite here. We can > repurpose that code in release.py and extract into a new script to perform > this opertaion. > [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue commented on PR #13990: URL: https://github.com/apache/kafka/pull/13990#issuecomment-1670405790 @junrao I've rebased against `trunk` over the last week to trigger Jenkins runs. Each build fails with a different set of tests. Most of the failing tests already have Jiras filed. I do want to point out another pull request I made (#14095) in which I literally only changed `README.md`. The first change was to add a newline, and that build included several test failures. I then removed that newline, and the resulting build included a different set of test failures. According to the graphs on [this Grade Enterprise page](https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America/Los_Angeles&search.values=trunk&tests.help=advancedQuery), for the last 28 days Jenkins builds are failing on `trunk` are failing 28% of the builds and 90% of the builds include flaky tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
lihaosky commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287753972 ## streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java: ## @@ -91,6 +91,7 @@ private Properties effectiveConfigFrom(final Properties initialConfig) { effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 100); effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true); effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 1); +effectiveConfig.put(KafkaConfig.RackProp(), "rack0"); Review Comment: Good catch. Might be some left over change. Let me delete it ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java: ## @@ -272,27 +300,50 @@ private void createMockTaskManager(final Set activeTasks, // If mockCreateInternalTopics is true the internal topic manager will report that it had to create all internal // topics and we will skip the listOffsets request for these changelogs private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) { -final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( +return overwriteInternalTopicManagerWithMock(mockCreateInternalTopics, null); +} + +private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics, final List>> topicPartitionInfo) { +final MockInternalTopicManager mockInternalTopicManager = spy(new MockInternalTopicManager( time, new StreamsConfig(configProps()), mockClientSupplier.restoreConsumer, mockCreateInternalTopics -); +)); + +if (topicPartitionInfo != null) { + lenient().when(mockInternalTopicManager.getTopicPartitionInfo(anySet())).thenAnswer( +i -> { +final Set topics = i.getArgument(0); +for (final Map> tp : topicPartitionInfo) { +if (topics.equals(tp.keySet())) { +return tp; +} +} +return null; Review Comment: We can return empty Map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15224) Automate version change to snapshot
[ https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanay Karmarkar reassigned KAFKA-15224: --- Assignee: Tanay Karmarkar > Automate version change to snapshot > > > Key: KAFKA-15224 > URL: https://issues.apache.org/jira/browse/KAFKA-15224 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Tanay Karmarkar >Priority: Minor > > We require changing to SNAPSHOT version as part of the release process [1]. > The specific manual steps are: > Update version on the branch to 0.10.0.1-SNAPSHOT in the following places: > * > ** docs/js/templateData.js > ** gradle.properties > ** kafka-merge-pr.py > ** streams/quickstart/java/pom.xml > ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml > ** streams/quickstart/pom.xml > ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the > -SNAPSHOT convention due to python version naming restrictions, instead > update it to 0.10.0.1.dev0) > ** tests/kafkatest/version.py > The diff of the changes look like > [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9] > > > It would be nice if we could run a script to automatically do it. Note that > release.py (line 550) already does something similar where it replaces > SNAPSHOT with actual version. We need to do the opposite here. We can > repurpose that code in release.py and extract into a new script to perform > this opertaion. > [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue commented on PR #13990: URL: https://github.com/apache/kafka/pull/13990#issuecomment-1670400792 > @kirktrue : For consistent infra issues, we could file an infra jira for the ASF infra team to help. For transient test failures, if we could identify when the test failure first occurred and correlate it with the PR that might have introduced it, we could file a jira and ping the PR author to take a look. @jun done: https://issues.apache.org/jira/browse/INFRA-24874 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue commented on PR #13990: URL: https://github.com/apache/kafka/pull/13990#issuecomment-1670369317 @junrao I created this INFRA Jira just now to see if they can help resolve some of the build issues: https://issues.apache.org/jira/browse/INFRA-24874 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move
[ https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-15312: --- Description: On ext4 file systems it is possible for KRaft to create zero-length snapshot files. Not all file system fsync to disk on close. For KRaft to guarantee that the data has made it to disk before calling rename, it needs to make sure that the file has been fsync. We have seen cases were the snapshot file has zero-length data on ext4 file system. {quote} "Delayed allocation" means that the filesystem tries to delay the allocation of physical disk blocks for written data for as long as possible. This policy brings some important performance benefits. Many files are short-lived; delayed allocation can keep the system from writing fleeting temporary files to disk at all. And, for longer-lived files, delayed allocation allows the kernel to accumulate more data and to allocate the blocks for data contiguously, speeding up both the write and any subsequent reads of that data. It's an important optimization which is found in most contemporary filesystems. But, if blocks have not been allocated for a file, there is no need to write them quickly as a security measure. Since the blocks do not yet exist, it is not possible to read somebody else's data from them. So ext4 will not (cannot) write out unallocated blocks as part of the next journal commit cycle. Those blocks will, instead, wait until the kernel decides to flush them out; at that point, physical blocks will be allocated on disk and the data will be made persistent. The kernel doesn't like to let file data sit unwritten for too long, but it can still take a minute or so (with the default settings) for that data to be flushed - far longer than the five seconds normally seen with ext3. And that is why a crash can cause the loss of quite a bit more data when ext4 is being used. {quote} from: [https://lwn.net/Articles/322823/] {quote}auto_da_alloc ( * ), noauto_da_alloc Many broken applications don't use fsync() when replacing existing files via patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", "foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will detect the replace-via-rename and replace-via-truncate patterns and force that any delayed allocation blocks are allocated such that at the next journal commit, in the default data=ordered mode, the data blocks of the new file are forced to disk before the rename() operation is committed. This provides roughly the same level of guarantees as ext3, and avoids the "zero-length" problem that can happen when a system crashes before the delayed allocation blocks are forced to disk. {quote} from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html] was: Not all file system fsync to disk on close. For KRaft to guarantee that the data has made it to disk before calling rename it needs to make sure that the file has been fsync. We have seen cases were the snapshot file has zero-length data on ext4 file system. {quote} "Delayed allocation" means that the filesystem tries to delay the allocation of physical disk blocks for written data for as long as possible. This policy brings some important performance benefits. Many files are short-lived; delayed allocation can keep the system from writing fleeting temporary files to disk at all. And, for longer-lived files, delayed allocation allows the kernel to accumulate more data and to allocate the blocks for data contiguously, speeding up both the write and any subsequent reads of that data. It's an important optimization which is found in most contemporary filesystems. But, if blocks have not been allocated for a file, there is no need to write them quickly as a security measure. Since the blocks do not yet exist, it is not possible to read somebody else's data from them. So ext4 will not (cannot) write out unallocated blocks as part of the next journal commit cycle. Those blocks will, instead, wait until the kernel decides to flush them out; at that point, physical blocks will be allocated on disk and the data will be made persistent. The kernel doesn't like to let file data sit unwritten for too long, but it can still take a minute or so (with the default settings) for that data to be flushed - far longer than the five seconds normally seen with ext3. And that is why a crash can cause the loss of quite a bit more data when ext4 is being used. {quote} from: [https://lwn.net/Articles/322823/] {quote}auto_da_alloc ( * ), noauto_da_alloc Many broken applications don't use fsync() when replacing existing files via patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", "foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled,
[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move
[ https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-15312: --- Fix Version/s: 3.5.2 > FileRawSnapshotWriter must flush before atomic move > --- > > Key: KAFKA-15312 > URL: https://issues.apache.org/jira/browse/KAFKA-15312 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.6.0, 3.5.2 > > > Not all file system fsync to disk on close. For KRaft to guarantee that the > data has made it to disk before calling rename it needs to make sure that the > file has been fsync. > We have seen cases were the snapshot file has zero-length data on ext4 file > system. > {quote} "Delayed allocation" means that the filesystem tries to delay the > allocation of physical disk blocks for written data for as long as possible. > This policy brings some important performance benefits. Many files are > short-lived; delayed allocation can keep the system from writing fleeting > temporary files to disk at all. And, for longer-lived files, delayed > allocation allows the kernel to accumulate more data and to allocate the > blocks for data contiguously, speeding up both the write and any subsequent > reads of that data. It's an important optimization which is found in most > contemporary filesystems. > But, if blocks have not been allocated for a file, there is no need to write > them quickly as a security measure. Since the blocks do not yet exist, it is > not possible to read somebody else's data from them. So ext4 will not > (cannot) write out unallocated blocks as part of the next journal commit > cycle. Those blocks will, instead, wait until the kernel decides to flush > them out; at that point, physical blocks will be allocated on disk and the > data will be made persistent. The kernel doesn't like to let file data sit > unwritten for too long, but it can still take a minute or so (with the > default settings) for that data to be flushed - far longer than the five > seconds normally seen with ext3. And that is why a crash can cause the loss > of quite a bit more data when ext4 is being used. > {quote} > from: [https://lwn.net/Articles/322823/] > {quote}auto_da_alloc ( * ), noauto_da_alloc > Many broken applications don't use fsync() when replacing existing files via > patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ > rename("foo.new", "foo"), or worse yet, fd = open("foo", > O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will > detect the replace-via-rename and replace-via-truncate patterns and force > that any delayed allocation blocks are allocated such that at the next > journal commit, in the default data=ordered mode, the data blocks of the new > file are forced to disk before the rename() operation is committed. This > provides roughly the same level of guarantees as ext3, and avoids the > "zero-length" problem that can happen when a system crashes before the > delayed allocation blocks are forced to disk. > {quote} > from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio merged pull request #14162: KAFKA-15312; Force channel before atomic file move
jsancio merged PR #14162: URL: https://github.com/apache/kafka/pull/14162 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287701495 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java: ## @@ -227,15 +350,38 @@ private void formatClientStates(final boolean printUnassigned) { } } +@Parameter +public boolean enableRackAwareTaskAssignor; + +private String rackAwareStrategy = StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE; + +@Before +public void setUp() { +if (enableRackAwareTaskAssignor) { +rackAwareStrategy = StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC; +} +} + +@Parameterized.Parameters(name = "enableRackAwareTaskAssignor={0}") +public static Collection getParamStoreType() { +return asList(new Object[][] { +{true}, +{false} +}); +} + @Test public void staticAssignmentShouldConvergeWithTheFirstAssignment() { final AssignmentConfigs configs = new AssignmentConfigs(100L, 2, 0, 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS); + EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, +null, +null, Review Comment: fix indention ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java: ## @@ -248,21 +394,29 @@ public void assignmentShouldConvergeAfterAddingNode() { final int numStatefulTasks = 11; final int maxWarmupReplicas = 2; final int numStandbyReplicas = 0; +final int numNodes = 10; final AssignmentConfigs configs = new AssignmentConfigs(100L, maxWarmupReplicas, numStandbyReplicas, 60_000L, - EMPTY_RACK_AWARE_ASSIGNMENT_TAGS); + EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, +null, +null, Review Comment: fix indention -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #14168: MINOR; Fix nanosecond elapsed time
jsancio commented on code in PR #14168: URL: https://github.com/apache/kafka/pull/14168#discussion_r1287699245 ## metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java: ## @@ -445,28 +444,24 @@ public void testBalancePartitionLeaders() throws Throwable { active.alterPartition(ANONYMOUS_CONTEXT, new AlterPartitionRequest .Builder(alterPartitionRequest, false).build((short) 0).data()).get(); -AtomicLong lastHeartbeatMs = new AtomicLong(getMonotonicMs(active.time())); +AtomicLong lastHeartbeatNs = new AtomicLong(active.time().nanoseconds()); sendBrokerHeartbeat(active, allBrokers, brokerEpochs); // Check that partitions are balanced TestUtils.waitForCondition( () -> { -long currentMonotonicMs = getMonotonicMs(active.time()); -if (currentMonotonicMs > lastHeartbeatMs.get() + (sessionTimeoutMillis / 2)) { -lastHeartbeatMs.set(currentMonotonicMs); Review Comment: If `active.time().nanoseconds()` overflows (wraps), `currentMonotonicMs > lastHeartbeatMs.get() + (sessionTimeoutMillis / 2)` will be false until roughly 292 years (2^63 nanoseconds). The Java Doc recommends to not comparing `nanoTime()` and to instead compare elapsed time `nanoTime() - nanoTime()`. See the Java Doc for details: https://docs.oracle.com/javase/8/docs/api/java/lang/System.html#nanoTime-- -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287697676 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java: ## @@ -45,75 +81,162 @@ private static final class Harness { private final Map droppedClientStates; private final StringBuilder history = new StringBuilder(); +public final Map> partitionsForTask; +public final Map> changelogPartitionsForTask; +public final Map> tasksForTopicGroup; +public final Cluster fullMetadata; +public final Map>> racksForProcessConsumer; +public final InternalTopicManager internalTopicManager; + private static Harness initializeCluster(final int numStatelessTasks, final int numStatefulTasks, - final int numNodes, - final Supplier partitionCountSupplier) { + final int numClients, + final Supplier partitionCountSupplier, + final int numNodes) { int subtopology = 0; final Set statelessTasks = new TreeSet<>(); int remainingStatelessTasks = numStatelessTasks; +final List nodes = getRandomNodes(numNodes); +int nodeIndex = 0; +final Set partitionInfoSet = new HashSet<>(); +final Map> partitionsForTask = new HashMap<>(); +final Map> changelogPartitionsForTask = new HashMap<>(); +final Map> tasksForTopicGroup = new HashMap<>(); + while (remainingStatelessTasks > 0) { final int partitions = Math.min(remainingStatelessTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { -statelessTasks.add(new TaskId(subtopology, i)); +final TaskId taskId = new TaskId(subtopology, i); +statelessTasks.add(taskId); remainingStatelessTasks--; + +final Node[] replica = getRandomReplica(nodes, nodeIndex); +partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); +nodeIndex++; + +partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); +tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); } subtopology++; } final Map statefulTaskEndOffsetSums = new TreeMap<>(); +final Map> topicPartitionInfo = new HashMap<>(); +final Set changelogNames = new HashSet<>(); int remainingStatefulTasks = numStatefulTasks; while (remainingStatefulTasks > 0) { +final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" + subtopology; +changelogNames.add(changelogTopicName); final int partitions = Math.min(remainingStatefulTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { -statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 15L); +final TaskId taskId = new TaskId(subtopology, i); +statefulTaskEndOffsetSums.put(taskId, 15L); remainingStatefulTasks--; + +Node[] replica = getRandomReplica(nodes, nodeIndex); +partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); +nodeIndex++; + +partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); +changelogPartitionsForTask.put(taskId, mkSet(new TopicPartition(changelogTopicName, i))); +tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); + +final Random random = new Random(); +final int changelogNodeIndex = random.nextInt(nodes.size()); +replica = getRandomReplica(nodes, changelogNodeIndex); +final TopicPartitionInfo info = new TopicPartitionInfo(i, replica[0], Arrays.asList(replica[0], replica[1]), Collections.emptyList()); +topicPartitionInfo.computeIfAbsent(changelogTopicName, tp -> new ArrayList<>()).add(info); } subtopology++; } +final MockTime time = new MockTime(); +final StreamsConfig streamsConfig = new StreamsConfig(configProps(true)); +f
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287697676 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java: ## @@ -45,75 +81,162 @@ private static final class Harness { private final Map droppedClientStates; private final StringBuilder history = new StringBuilder(); +public final Map> partitionsForTask; +public final Map> changelogPartitionsForTask; +public final Map> tasksForTopicGroup; +public final Cluster fullMetadata; +public final Map>> racksForProcessConsumer; +public final InternalTopicManager internalTopicManager; + private static Harness initializeCluster(final int numStatelessTasks, final int numStatefulTasks, - final int numNodes, - final Supplier partitionCountSupplier) { + final int numClients, + final Supplier partitionCountSupplier, + final int numNodes) { int subtopology = 0; final Set statelessTasks = new TreeSet<>(); int remainingStatelessTasks = numStatelessTasks; +final List nodes = getRandomNodes(numNodes); +int nodeIndex = 0; +final Set partitionInfoSet = new HashSet<>(); +final Map> partitionsForTask = new HashMap<>(); +final Map> changelogPartitionsForTask = new HashMap<>(); +final Map> tasksForTopicGroup = new HashMap<>(); + while (remainingStatelessTasks > 0) { final int partitions = Math.min(remainingStatelessTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { -statelessTasks.add(new TaskId(subtopology, i)); +final TaskId taskId = new TaskId(subtopology, i); +statelessTasks.add(taskId); remainingStatelessTasks--; + +final Node[] replica = getRandomReplica(nodes, nodeIndex); +partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); +nodeIndex++; + +partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); +tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); } subtopology++; } final Map statefulTaskEndOffsetSums = new TreeMap<>(); +final Map> topicPartitionInfo = new HashMap<>(); +final Set changelogNames = new HashSet<>(); int remainingStatefulTasks = numStatefulTasks; while (remainingStatefulTasks > 0) { +final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" + subtopology; +changelogNames.add(changelogTopicName); final int partitions = Math.min(remainingStatefulTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { -statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 15L); +final TaskId taskId = new TaskId(subtopology, i); +statefulTaskEndOffsetSums.put(taskId, 15L); remainingStatefulTasks--; + +Node[] replica = getRandomReplica(nodes, nodeIndex); +partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); +nodeIndex++; + +partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); +changelogPartitionsForTask.put(taskId, mkSet(new TopicPartition(changelogTopicName, i))); +tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); + +final Random random = new Random(); +final int changelogNodeIndex = random.nextInt(nodes.size()); +replica = getRandomReplica(nodes, changelogNodeIndex); +final TopicPartitionInfo info = new TopicPartitionInfo(i, replica[0], Arrays.asList(replica[0], replica[1]), Collections.emptyList()); +topicPartitionInfo.computeIfAbsent(changelogTopicName, tp -> new ArrayList<>()).add(info); } subtopology++; } +final MockTime time = new MockTime(); +final StreamsConfig streamsConfig = new StreamsConfig(configProps(true)); +f
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287695148 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java: ## @@ -45,75 +81,162 @@ private static final class Harness { private final Map droppedClientStates; private final StringBuilder history = new StringBuilder(); +public final Map> partitionsForTask; +public final Map> changelogPartitionsForTask; +public final Map> tasksForTopicGroup; +public final Cluster fullMetadata; +public final Map>> racksForProcessConsumer; +public final InternalTopicManager internalTopicManager; + private static Harness initializeCluster(final int numStatelessTasks, final int numStatefulTasks, - final int numNodes, - final Supplier partitionCountSupplier) { + final int numClients, + final Supplier partitionCountSupplier, + final int numNodes) { int subtopology = 0; final Set statelessTasks = new TreeSet<>(); int remainingStatelessTasks = numStatelessTasks; +final List nodes = getRandomNodes(numNodes); +int nodeIndex = 0; +final Set partitionInfoSet = new HashSet<>(); +final Map> partitionsForTask = new HashMap<>(); +final Map> changelogPartitionsForTask = new HashMap<>(); +final Map> tasksForTopicGroup = new HashMap<>(); + while (remainingStatelessTasks > 0) { final int partitions = Math.min(remainingStatelessTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { -statelessTasks.add(new TaskId(subtopology, i)); +final TaskId taskId = new TaskId(subtopology, i); +statelessTasks.add(taskId); remainingStatelessTasks--; + +final Node[] replica = getRandomReplica(nodes, nodeIndex); +partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); +nodeIndex++; + +partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); +tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); } subtopology++; } final Map statefulTaskEndOffsetSums = new TreeMap<>(); +final Map> topicPartitionInfo = new HashMap<>(); +final Set changelogNames = new HashSet<>(); int remainingStatefulTasks = numStatefulTasks; while (remainingStatefulTasks > 0) { +final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" + subtopology; +changelogNames.add(changelogTopicName); final int partitions = Math.min(remainingStatefulTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { -statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 15L); +final TaskId taskId = new TaskId(subtopology, i); +statefulTaskEndOffsetSums.put(taskId, 15L); remainingStatefulTasks--; + +Node[] replica = getRandomReplica(nodes, nodeIndex); +partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); +nodeIndex++; + +partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); +changelogPartitionsForTask.put(taskId, mkSet(new TopicPartition(changelogTopicName, i))); +tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); + +final Random random = new Random(); Review Comment: We should init `Random` with a value that we log (to allow us to reproduce the same test) ``` final long seed = System.currentTimeMillis(): log.info(seed); // or just use println final Random random = new Random(seed); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287695148 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java: ## @@ -45,75 +81,162 @@ private static final class Harness { private final Map droppedClientStates; private final StringBuilder history = new StringBuilder(); +public final Map> partitionsForTask; +public final Map> changelogPartitionsForTask; +public final Map> tasksForTopicGroup; +public final Cluster fullMetadata; +public final Map>> racksForProcessConsumer; +public final InternalTopicManager internalTopicManager; + private static Harness initializeCluster(final int numStatelessTasks, final int numStatefulTasks, - final int numNodes, - final Supplier partitionCountSupplier) { + final int numClients, + final Supplier partitionCountSupplier, + final int numNodes) { int subtopology = 0; final Set statelessTasks = new TreeSet<>(); int remainingStatelessTasks = numStatelessTasks; +final List nodes = getRandomNodes(numNodes); +int nodeIndex = 0; +final Set partitionInfoSet = new HashSet<>(); +final Map> partitionsForTask = new HashMap<>(); +final Map> changelogPartitionsForTask = new HashMap<>(); +final Map> tasksForTopicGroup = new HashMap<>(); + while (remainingStatelessTasks > 0) { final int partitions = Math.min(remainingStatelessTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { -statelessTasks.add(new TaskId(subtopology, i)); +final TaskId taskId = new TaskId(subtopology, i); +statelessTasks.add(taskId); remainingStatelessTasks--; + +final Node[] replica = getRandomReplica(nodes, nodeIndex); +partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); +nodeIndex++; + +partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); +tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); } subtopology++; } final Map statefulTaskEndOffsetSums = new TreeMap<>(); +final Map> topicPartitionInfo = new HashMap<>(); +final Set changelogNames = new HashSet<>(); int remainingStatefulTasks = numStatefulTasks; while (remainingStatefulTasks > 0) { +final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" + subtopology; +changelogNames.add(changelogTopicName); final int partitions = Math.min(remainingStatefulTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { -statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 15L); +final TaskId taskId = new TaskId(subtopology, i); +statefulTaskEndOffsetSums.put(taskId, 15L); remainingStatefulTasks--; + +Node[] replica = getRandomReplica(nodes, nodeIndex); +partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); +nodeIndex++; + +partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); +changelogPartitionsForTask.put(taskId, mkSet(new TopicPartition(changelogTopicName, i))); +tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); + +final Random random = new Random(); Review Comment: We should init random with a value that we log (to allow us to reproduce the same test) ``` final long seed = System.currentTimeMillis(): log.info(seed); // or just use println final Random random = new Random(seed); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287692186 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java: ## @@ -587,14 +587,18 @@ static List getRandomNodes(final int nodeSize) { return nodeList; } +static Node[] getRandomReplica(final List nodeList, final int index) { Review Comment: Does not seem to be random actually? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java: ## @@ -587,14 +587,18 @@ static List getRandomNodes(final int nodeSize) { return nodeList; } +static Node[] getRandomReplica(final List nodeList, final int index) { Review Comment: Does not seem to be random actually? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287685324 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java: ## @@ -1768,16 +1993,22 @@ public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); subscriptions.put(CONSUMER_1, Review Comment: nit: move `CONSUMER_1` to next line, too (same below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287685324 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java: ## @@ -1768,16 +1993,22 @@ public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); subscriptions.put(CONSUMER_1, Review Comment: nit: move `CONSUMER_1` to next line, too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287683870 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java: ## @@ -1241,12 +1410,18 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { final List topics = asList("topic1", APPLICATION_ID + "-topicX", APPLICATION_ID + "-topicZ"); final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); -final MockInternalTopicManager internalTopicManager = configureDefault(); +createDefaultMockTaskManager(); +final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(4, Review Comment: as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287683621 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java: ## @@ -1213,12 +1376,18 @@ public void testAssignWithInternalTopics() { final List topics = asList("topic1", APPLICATION_ID + "-topicX"); final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); -final MockInternalTopicManager internalTopicManager = configureDefault(); +createDefaultMockTaskManager(); +final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(4, Review Comment: nit: move first parameter to next line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287682340 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java: ## @@ -809,14 +888,37 @@ public void testAssignWithStates() { APPLICATION_ID + "-store3-changelog"), asList(3, 3, 3)) ); -configureDefault(); + +createDefaultMockTaskManager(); +final List>> changelogTopicPartitionInfo = getTopicPartitionInfo(3, Review Comment: nit: move first parameter `3,` into it's own line below (hard to read the code otherwise) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287681335 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java: ## @@ -272,27 +300,50 @@ private void createMockTaskManager(final Set activeTasks, // If mockCreateInternalTopics is true the internal topic manager will report that it had to create all internal // topics and we will skip the listOffsets request for these changelogs private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) { -final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( +return overwriteInternalTopicManagerWithMock(mockCreateInternalTopics, null); +} + +private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics, final List>> topicPartitionInfo) { +final MockInternalTopicManager mockInternalTopicManager = spy(new MockInternalTopicManager( time, new StreamsConfig(configProps()), mockClientSupplier.restoreConsumer, mockCreateInternalTopics -); +)); + +if (topicPartitionInfo != null) { + lenient().when(mockInternalTopicManager.getTopicPartitionInfo(anySet())).thenAnswer( +i -> { +final Set topics = i.getArgument(0); +for (final Map> tp : topicPartitionInfo) { +if (topics.equals(tp.keySet())) { +return tp; +} +} +return null; Review Comment: Is `null` right here? Or should it be an empty `Map` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287680080 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java: ## @@ -272,27 +300,50 @@ private void createMockTaskManager(final Set activeTasks, // If mockCreateInternalTopics is true the internal topic manager will report that it had to create all internal // topics and we will skip the listOffsets request for these changelogs private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) { -final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( +return overwriteInternalTopicManagerWithMock(mockCreateInternalTopics, null); +} + +private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics, final List>> topicPartitionInfo) { +final MockInternalTopicManager mockInternalTopicManager = spy(new MockInternalTopicManager( time, new StreamsConfig(configProps()), mockClientSupplier.restoreConsumer, mockCreateInternalTopics -); +)); + +if (topicPartitionInfo != null) { + lenient().when(mockInternalTopicManager.getTopicPartitionInfo(anySet())).thenAnswer( +i -> { Review Comment: So `i` is the input parameter passed into `getTopicPartitionInfo` ? -- Can we find a better name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287676811 ## streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java: ## @@ -91,6 +91,7 @@ private Properties effectiveConfigFrom(final Properties initialConfig) { effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 100); effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true); effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 1); +effectiveConfig.put(KafkaConfig.RackProp(), "rack0"); Review Comment: why do we need this overwrite? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor
mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287675546 ## streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java: ## @@ -300,7 +329,9 @@ private static Properties streamsProperties(final String appId, // Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455) mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()), -mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()) +mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()), +mkEntry(CommonClientConfigs.CLIENT_RACK_CONFIG, AssignmentTestUtils.RACK_0), Review Comment: Why is this hard-coded? Should we use different racks for different KS instances? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15318) Update the Authorizer using AclPublisher
[ https://issues.apache.org/jira/browse/KAFKA-15318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-15318: Assignee: Colin McCabe > Update the Authorizer using AclPublisher > > > Key: KAFKA-15318 > URL: https://issues.apache.org/jira/browse/KAFKA-15318 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > > On the controller, move publishing acls to the authorizer into a dedicated > MetadataPublisher, AclPublisher. This publisher listens for notifications > from MetadataLoader, and receives only committed data. This brings the > controller side in line with how the broker has always worked. It also avoids > some ugly code related to publishing directly from the QuorumController. Most > important of all, it clears the way to implement metadata transactions > without worrying about Authorizer state (since it will be handled by the > MetadataLoader, along with other metadata image state). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe opened a new pull request, #14169: KAFKA-15318: Update the Authorizer using AclPublisher
cmccabe opened a new pull request, #14169: URL: https://github.com/apache/kafka/pull/14169 On the controller, move publishing acls to the Authorizer into a dedicated MetadataPublisher, AclPublisher. This publisher listens for notifications from MetadataLoader, and receives only committed data. This brings the controller side in line with how the broker has always worked. It also avoids some ugly code related to publishing directly from the QuorumController. Most important of all, it clears the way to implement metadata transactions without worrying about Authorizer state (since it will be handled by the MetadataLoader, along with other metadata image state). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15318) Update the Authorizer using AclPublisher
[ https://issues.apache.org/jira/browse/KAFKA-15318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15318: - Description: On the controller, move publishing acls to the authorizer into a dedicated MetadataPublisher, AclPublisher. This publisher listens for notifications from MetadataLoader, and receives only committed data. This brings the controller side in line with how the broker has always worked. It also avoids some ugly code related to publishing directly from the QuorumController. Most important of all, it clears the way to implement metadata transactions without worrying about Authorizer state (since it will be handled by the MetadataLoader, along with other metadata image state). (was: On the controller, move Acl publishing into a dedicated MetadataPublisher, AclPublisher. This publisher listens for notifications from MetadataLoader, and receives only committed data. This brings the controller side in line with how the broker has always worked. It also avoids some ugly code related to publishing directly from the QuorumController. Most important of all, it clears the way to implement metadata transactions without worrying about Authorizer state (since it will be handled by the MetadataLoader, along with other metadata image state).) > Update the Authorizer using AclPublisher > > > Key: KAFKA-15318 > URL: https://issues.apache.org/jira/browse/KAFKA-15318 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Priority: Major > > On the controller, move publishing acls to the authorizer into a dedicated > MetadataPublisher, AclPublisher. This publisher listens for notifications > from MetadataLoader, and receives only committed data. This brings the > controller side in line with how the broker has always worked. It also avoids > some ugly code related to publishing directly from the QuorumController. Most > important of all, it clears the way to implement metadata transactions > without worrying about Authorizer state (since it will be handled by the > MetadataLoader, along with other metadata image state). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15318) Update the Authorizer using AclPublisher
[ https://issues.apache.org/jira/browse/KAFKA-15318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15318: - Summary: Update the Authorizer using AclPublisher (was: Move Acl publishing outside the QuorumController) > Update the Authorizer using AclPublisher > > > Key: KAFKA-15318 > URL: https://issues.apache.org/jira/browse/KAFKA-15318 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Priority: Major > > On the controller, move Acl publishing into a dedicated MetadataPublisher, > AclPublisher. This publisher listens for notifications from MetadataLoader, > and receives only committed data. This brings the controller side in line > with how the broker has always worked. It also avoids some ugly code related > to publishing directly from the QuorumController. Most important of all, it > clears the way to implement metadata transactions without worrying about > Authorizer state (since it will be handled by the MetadataLoader, along with > other metadata image state). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15318) Move Acl publishing outside the QuorumController
Colin McCabe created KAFKA-15318: Summary: Move Acl publishing outside the QuorumController Key: KAFKA-15318 URL: https://issues.apache.org/jira/browse/KAFKA-15318 Project: Kafka Issue Type: Bug Reporter: Colin McCabe On the controller, move Acl publishing into a dedicated MetadataPublisher, AclPublisher. This publisher listens for notifications from MetadataLoader, and receives only committed data. This brings the controller side in line with how the broker has always worked. It also avoids some ugly code related to publishing directly from the QuorumController. Most important of all, it clears the way to implement metadata transactions without worrying about Authorizer state (since it will be handled by the MetadataLoader, along with other metadata image state). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15317) Fix for async consumer access to committed offsets with multiple consumers
Lianet Magrans created KAFKA-15317: -- Summary: Fix for async consumer access to committed offsets with multiple consumers Key: KAFKA-15317 URL: https://issues.apache.org/jira/browse/KAFKA-15317 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Access to the committed offsets via a call to the _committed_ API func works as expected for a single async consumer, but it some times fails with timeout when trying to retrieve the committed offsets with another consumer in the same group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15126) Change range queries to accept null lower and upper bounds
[ https://issues.apache.org/jira/browse/KAFKA-15126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-15126. - Resolution: Fixed [Merged to trunk|https://github.com/apache/kafka/pull/14137] > Change range queries to accept null lower and upper bounds > -- > > Key: KAFKA-15126 > URL: https://issues.apache.org/jira/browse/KAFKA-15126 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Minor > Fix For: 3.6.0 > > Original Estimate: 672h > Remaining Estimate: 672h > > {color:#1d1c1d}When web client requests come in with query params, it's > common for those params to be null. We want developers to just be able to > pass in the upper/lower bounds if they want instead of implementing their own > logic to avoid getting the whole range (which will happen if they leave the > params null). {color} > {color:#1d1c1d}An example of the logic they can avoid using after this KIP is > implemented is below:{color} > {code:java} > private RangeQuery> > createRangeQuery(String lower, String upper) { > if (isBlank(lower) && isBlank(upper)) { > return RangeQuery.withNoBounds(); > } else if (!isBlank(lower) && isBlank(upper)) { > return RangeQuery.withLowerBound(lower); > } else if (isBlank(lower) && !isBlank(upper)) { > return RangeQuery.withUpperBound(upper); > } else { > return RangeQuery.withRange(lower, upper); > } > } {code} > > | | -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15126) Change range queries to accept null lower and upper bounds
[ https://issues.apache.org/jira/browse/KAFKA-15126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-15126: Fix Version/s: 3.6.0 > Change range queries to accept null lower and upper bounds > -- > > Key: KAFKA-15126 > URL: https://issues.apache.org/jira/browse/KAFKA-15126 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Minor > Fix For: 3.6.0 > > Original Estimate: 672h > Remaining Estimate: 672h > > {color:#1d1c1d}When web client requests come in with query params, it's > common for those params to be null. We want developers to just be able to > pass in the upper/lower bounds if they want instead of implementing their own > logic to avoid getting the whole range (which will happen if they leave the > params null). {color} > {color:#1d1c1d}An example of the logic they can avoid using after this KIP is > implemented is below:{color} > {code:java} > private RangeQuery> > createRangeQuery(String lower, String upper) { > if (isBlank(lower) && isBlank(upper)) { > return RangeQuery.withNoBounds(); > } else if (!isBlank(lower) && isBlank(upper)) { > return RangeQuery.withLowerBound(lower); > } else if (isBlank(lower) && !isBlank(upper)) { > return RangeQuery.withUpperBound(upper); > } else { > return RangeQuery.withRange(lower, upper); > } > } {code} > > | | -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15316) CommitRequestManager not calling RequestState callbacks
Lianet Magrans created KAFKA-15316: -- Summary: CommitRequestManager not calling RequestState callbacks Key: KAFKA-15316 URL: https://issues.apache.org/jira/browse/KAFKA-15316 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Lianet Magrans CommitRequestManager is not triggering the RequestState callbacks that update {_}lastReceivedMs{_}, affecting the _canSendRequest_ verification of the RequestState -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752150#comment-17752150 ] Matthias J. Sax commented on KAFKA-15297: - Seems we are on the same page :) > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache of {{stateStoreC}} is flushed before the caches of > {{stateStoreA}} and {{stateStoreB}}. > You can observe the flush order by feeding some records into the input topics > of the topology, waiting for a commit, and looking for the following log > message: > https://github.com/apache/kafka/blob/2e1947d240607d53f071f61c875cfffc3fec47fe/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513 > > I changed the log message from trace to debug to avoid too much noise. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752149#comment-17752149 ] Matthias J. Sax commented on KAFKA-15259: - The only thing I am wondering is, if we should try to fix it right away in KS by adding an internal producer config for now until K15309 get done? – In the end it seems that a KS app could get stuck without the ability to skip over a "poison pill write", so it might be worth fixing right away (even if the fix is more of an hack for now)? > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using exactly_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using exactly_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > exactly_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using exactly_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with exactly_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of exactly_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of this error but > written offsets would not be recorded. > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 INFO TransactionManager:393 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Transiting to abortable error state > due to org.apache.kafka
[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
AndrewJSchofield commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1287441674 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -613,7 +623,7 @@ private long batchReady(long nowMs, boolean exhausted, TopicPartition part, Node long waitedTimeMs, boolean backingOff, boolean full, long nextReadyCheckDelayMs, Set readyNodes) { if (!readyNodes.contains(leader) && !isMuted(part)) { -long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; +long timeToWaitMs = backingOff ? retryBackoff.backoff(0) : lingerMs; Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
satishd commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1287011673 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -343,21 +345,78 @@ public void onLeadershipChange(Set partitionsBecomeLeader, /** * Deletes the internal topic partition info if delete flag is set as true. * - * @param topicPartition topic partition to be stopped. + * @param topicPartitions topic partitions that needs to be stopped. * @param delete flag to indicate whether the given topic partitions to be deleted or not. */ -public void stopPartitions(TopicPartition topicPartition, boolean delete) { +public void stopPartitions(Set topicPartitions, + boolean delete, + BiConsumer errorHandler) { +LOGGER.debug("Stopping {} partitions, delete: {}", topicPartitions.size(), delete); +Set topicIdPartitions = topicPartitions.stream() +.filter(topicIdByPartitionMap::containsKey) +.map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), tp)) +.collect(Collectors.toSet()); + +topicIdPartitions.forEach(tpId -> { +try { +RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId); +if (task != null) { +LOGGER.info("Cancelling the RLM task for tpId: {}", tpId); +task.cancel(); +} +if (delete) { +LOGGER.info("Deleting the remote log segments task for partition: {}", tpId); +deleteRemoteLogPartition(tpId); Review Comment: [KIP-405](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-topic-deletionTopicdeletionlifecycle) already talks about the approach of deleting remote log segments in asynchronous manner using controller and RLMM. This is not a blocking change for 3.6.0, filed [JIRA](https://issues.apache.org/jira/browse/KAFKA-15313) for the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
satishd commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1287425728 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -343,21 +345,78 @@ public void onLeadershipChange(Set partitionsBecomeLeader, /** * Deletes the internal topic partition info if delete flag is set as true. * - * @param topicPartition topic partition to be stopped. + * @param topicPartitions topic partitions that needs to be stopped. * @param delete flag to indicate whether the given topic partitions to be deleted or not. */ -public void stopPartitions(TopicPartition topicPartition, boolean delete) { +public void stopPartitions(Set topicPartitions, + boolean delete, + BiConsumer errorHandler) { +LOGGER.debug("Stopping {} partitions, delete: {}", topicPartitions.size(), delete); +Set topicIdPartitions = topicPartitions.stream() +.filter(topicIdByPartitionMap::containsKey) +.map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), tp)) +.collect(Collectors.toSet()); + +topicIdPartitions.forEach(tpId -> { +try { +RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId); +if (task != null) { +LOGGER.info("Cancelling the RLM task for tpId: {}", tpId); +task.cancel(); +} +if (delete) { +LOGGER.info("Deleting the remote log segments task for partition: {}", tpId); +deleteRemoteLogPartition(tpId); Review Comment: [KIP-405](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-topic-deletionTopicdeletionlifecycle) already talks about the approach of deleting remote log segments in asynchronous manner using controller and RLMM. This is not a blocking change for 3.6.0, filed [JIRA](https://issues.apache.org/jira/browse/KAFKA-15313) for the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #14055: KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898)
gharris1727 merged PR #14055: URL: https://github.com/apache/kafka/pull/14055 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14055: KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898)
gharris1727 commented on PR #14055: URL: https://github.com/apache/kafka/pull/14055#issuecomment-1669994084 Flaky test failures appear unrelated, and the tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #14168: MINOR; Fix nanosecond elapsed time
jsancio opened a new pull request, #14168: URL: https://github.com/apache/kafka/pull/14168 Time.nanoseconds() (System.nanoTime()) is not monotonically increasing and it is allowed for the value to overflow (wrap). It can only be used to measure elapsed time. Elapsed time can be computed by subtracting two nanoseconds() calls. I ran `testBalancePartitionLeaders` 1000 times using jqwik without any failures. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
junrao commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1286480033 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -613,7 +623,7 @@ private long batchReady(long nowMs, boolean exhausted, TopicPartition part, Node long waitedTimeMs, boolean backingOff, boolean full, long nextReadyCheckDelayMs, Set readyNodes) { if (!readyNodes.contains(leader) && !isMuted(part)) { -long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; +long timeToWaitMs = backingOff ? retryBackoff.backoff(0) : lingerMs; Review Comment: Hmm, we should use the number of retries for the batch instead of 0 for calculating timeToWaitMs, right? Also, this is an existing issue. It seems that nowMs is never used. ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -144,13 +159,25 @@ public long metadataExpireMs() { */ public synchronized int requestUpdate() { this.needFullUpdate = true; +this.backoffUpdateRequests = 0L; Review Comment: Hmm, this probably needs some more thought. For example, if a produce request fails, we request a metadata update and set backoffUpdateRequests to 0. However, if we exponentially back off the produce request because of stale metadata, it seems that we should do the same for refreshing the metadata. Otherwise, we likely will still have the stale metadata when retrying the produce request. Ditto for fetch requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #14001: Kafka Streams Threading: Punctuation (5/N)
lucasbru commented on code in PR #14001: URL: https://github.com/apache/kafka/pull/14001#discussion_r1287335415 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ## @@ -86,12 +87,29 @@ private void runOnce(final long nowMs) { if (currentTask == null) { currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this); -} else { -// if a task is no longer processable, ask task-manager to give it another -// task in the next iteration -if (currentTask.isProcessable(nowMs)) { +} + +if (currentTask != null) { Review Comment: Yes, I think this is good for a separate PR. Also not really related to punctuation anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #14001: Kafka Streams Threading: Punctuation (5/N)
cadonna commented on code in PR #14001: URL: https://github.com/apache/kafka/pull/14001#discussion_r1287332777 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ## @@ -86,12 +87,29 @@ private void runOnce(final long nowMs) { if (currentTask == null) { currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this); -} else { -// if a task is no longer processable, ask task-manager to give it another -// task in the next iteration -if (currentTask.isProcessable(nowMs)) { +} + +if (currentTask != null) { Review Comment: Let's discuss that and put it into a separate PR. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #14092: KAFKA-15239: Fix system tests using producer performance service
fvaleri commented on code in PR #14092: URL: https://github.com/apache/kafka/pull/14092#discussion_r1287331776 ## tests/kafkatest/services/performance/producer_performance.py: ## @@ -24,7 +24,6 @@ from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH - Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property
kamalcph commented on code in PR #14161: URL: https://github.com/apache/kafka/pull/14161#discussion_r1287319821 ## core/src/main/scala/kafka/server/ConfigHandler.scala: ## @@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } + +if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem() Review Comment: For my understanding, will this approach work if the user wants to downgrade from 3.6 to 2.8? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property
kamalcph commented on code in PR #14161: URL: https://github.com/apache/kafka/pull/14161#discussion_r1287319821 ## core/src/main/scala/kafka/server/ConfigHandler.scala: ## @@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } + +if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem() Review Comment: For my understanding, will this approach will work if the user wants to downgrade from 3.6 to 2.8? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14092: KAFKA-15239: Fix system tests using producer performance service
gharris1727 commented on code in PR #14092: URL: https://github.com/apache/kafka/pull/14092#discussion_r1287310500 ## tests/kafkatest/services/performance/producer_performance.py: ## @@ -24,7 +24,6 @@ from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH - Review Comment: Could you revert this whitespace change too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752108#comment-17752108 ] Guozhang Wang commented on KAFKA-15297: --- {{We are not sure if we should not instead decouple caching from forwarding.}} I'd assume double negations here mean "We think we should just try to decouple caching from forwarding as the right solution" :) And yes, I'd love to see that happening as I've advocated for it for many years, and I was thinking about just "suppressing" the records in the last sink processor of the sub-topology to achieve the same effect of less send over the network. It may be just similar to what you meant by "caching on the last state store" or may be having some corner differences. In either way like you said it will lose some benefit of processing less records at the later stage of a sub-topology, but I think in most cases given a sub-topology's size this seems a good trade for simplicity. It also have many other benefits, just to name a few: 1) we have much simpler timestamp tracking (today it's as finer-grained as per-processor) with a task as every record will always go through the whole sub-topology, 2) we have simpler version tracking within sub-topologies for IQ since now all state stores have the same version. > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache
[GitHub] [kafka] bachmanity1 commented on pull request #14153: KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest
bachmanity1 commented on PR #14153: URL: https://github.com/apache/kafka/pull/14153#issuecomment-1669806857 @yashmayya I've applied your suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor
mjsax merged PR #14139: URL: https://github.com/apache/kafka/pull/14139 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #14163: MINOR: Improve JavaDocs of KafkaStreams `context.commit()`
mjsax merged PR #14163: URL: https://github.com/apache/kafka/pull/14163 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bachmanity1 commented on a diff in pull request #14153: KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest
bachmanity1 commented on code in PR #14153: URL: https://github.com/apache/kafka/pull/14153#discussion_r1287236551 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -114,29 +113,49 @@ public class KafkaBasedLogTest { private static final String TP0_VALUE_NEW = "VAL0_NEW"; private static final String TP1_VALUE_NEW = "VAL1_NEW"; -private Time time = new MockTime(); -private KafkaBasedLog store; +private final Time time = new MockTime(); +private MockedKafkaBasedLog store; @Mock -private Runnable initializer; +private Consumer initializer; @Mock private KafkaProducer producer; -private MockConsumer consumer; -@Mock Review Comment: `admin` field needs to be mocked only in methods where previously `setupWithAdmin` was used, if we mock it everywhere then some tests don't pass because they expect that `admin` is `null`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13860: KAFKA-15093: Add 3.4 and 3.5 Streams upgrade system tests
mimaison commented on PR #13860: URL: https://github.com/apache/kafka/pull/13860#issuecomment-1669737299 This depends on https://github.com/apache/kafka/pull/14103 so we need to get that merged first -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15100) Unsafe to call tryCompleteFetchResponse on request timeout
[ https://issues.apache.org/jira/browse/KAFKA-15100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-15100: --- Fix Version/s: 3.6.0 > Unsafe to call tryCompleteFetchResponse on request timeout > -- > > Key: KAFKA-15100 > URL: https://issues.apache.org/jira/browse/KAFKA-15100 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.6.0 > > > When the fetch request times out the future is completed from the > "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that > tryCompleteFetchResponse is always called from the same thread. This > invariant is violated in this case. > {code:java} > return future.handle((completionTimeMs, exception) -> { > if (exception != null) { > Throwable cause = exception instanceof ExecutionException ? > exception.getCause() : exception; // > If the fetch timed out in purgatory, it means no new data is available, > // and we will complete the fetch successfully. Otherwise, > if there was > // any other error, we need to return it. > Errors error = Errors.forException(cause); > if (error != Errors.REQUEST_TIMED_OUT) { > logger.info("Failed to handle fetch from {} at {} due > to {}", > replicaId, fetchPartition.fetchOffset(), error); > return buildEmptyFetchResponse(error, Optional.empty()); > } > } // FIXME: `completionTimeMs`, which can be null > logger.trace("Completing delayed fetch from {} starting at > offset {} at {}", > replicaId, fetchPartition.fetchOffset(), completionTimeMs); > return tryCompleteFetchRequest(replicaId, fetchPartition, > time.milliseconds()); > }); > {code} > One solution is to always build an empty response if the future was completed > exceptionally. This works because the ExpirationService completes the future > with a `TimeoutException`. > A longer-term solution is to use a more flexible event executor service. This > would be a service that allows more kinds of event to get scheduled/submitted > to the KRaft thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property
clolov commented on PR #14161: URL: https://github.com/apache/kafka/pull/14161#issuecomment-1669704586 Politely requesting a review @showuon @divijvaidya @satishd! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property
clolov commented on code in PR #14161: URL: https://github.com/apache/kafka/pull/14161#discussion_r1287188953 ## core/src/test/scala/unit/kafka/server/KafkaServerTest.scala: ## @@ -154,6 +155,92 @@ class KafkaServerTest extends QuorumTestHarness { server.shutdown() } + @Test + def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit = { +val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + +val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps)) +server.remoteLogManagerOpt match { + case Some(_) => + case None => fail("RemoteLogManager should be initialized") +} + +val topicProps = new Properties() +topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, true.toString) Review Comment: I do not know why there isn't a TestUtils.createTopicConfigs ¯\\\_(ツ)\_/¯ Happy to add it if others think it might be useful! ## core/src/test/scala/unit/kafka/server/KafkaServerTest.scala: ## @@ -154,6 +155,92 @@ class KafkaServerTest extends QuorumTestHarness { server.shutdown() } + @Test + def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit = { +val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + +val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps)) +server.remoteLogManagerOpt match { + case Some(_) => + case None => fail("RemoteLogManager should be initialized") +} + +val topicProps = new Properties() +topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, true.toString) Review Comment: I do not know why there isn't a TestUtils.createTopicConfigs ¯\\\_(ツ)\_/¯ Happy to add it if others think it might be useful! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15315) Use getOrDefault rather than get
roon created KAFKA-15315: Summary: Use getOrDefault rather than get Key: KAFKA-15315 URL: https://issues.apache.org/jira/browse/KAFKA-15315 Project: Kafka Issue Type: Improvement Components: clients Reporter: roon -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
clolov commented on PR #14127: URL: https://github.com/apache/kafka/pull/14127#issuecomment-1669607117 Apologies for joining the review late (I had to spend a few hours getting acquainted with the ConsumerTask code). I carried out a first pass on the changes to the main files and they make sense to me. I will circle back tomorrow morning and have another go through the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1287096186 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,52 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenApply(v -> { +Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, new FutureCallback<>()); +try { +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (ExecutionException e) { +log.error("{} Flush of tombstone(s) offsets to secondary store threw an unexpected exception: ", this, e.getCause()); +} catch (Exception e) { +log.error("{} Got Exception when trying to flush tombstone(s) offsets to secondary store", this, e); +} Review Comment: That's a great catch. I need to catch it to keep the compiler happy(get() throws a couple of exceptions). Actually I realised my tests were doing the asserts incorrectly. I have made further changes of the form => 1. Replace `thenApply` with `thenAccept` when chaining CompletableFutures. This makes sense since we don't really need to do any transformations but just perform an action (setting to primary/secondary store). 2. I now return the CompletableFuture object directly and invoke the passed callback's onComplete at appropriate places. Also note that for error cases, it doesn't seem to be possible to throw checked exceptions from any of these chaining methods like `thenAccept` etc. We need to wrap them in a RuntimeException and handle them via `exceptionally` or `whenComplete`. I haven't chosen either of the 2 approaches since eventually the errors need to reflect on the callback. 3. Made changes to the tests where now I assert errors and results directly within the callback. I believe this is closer to the actual changes. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #14153: KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest
yashmayya commented on code in PR #14153: URL: https://github.com/apache/kafka/pull/14153#discussion_r1287038386 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java: ## @@ -103,7 +103,7 @@ public class KafkaBasedLog { private Optional> producer; private TopicAdmin admin; -private Thread thread; +Thread thread; Review Comment: ```suggestion // Visible for testing Thread thread; ``` nit ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -114,29 +113,49 @@ public class KafkaBasedLogTest { private static final String TP0_VALUE_NEW = "VAL0_NEW"; private static final String TP1_VALUE_NEW = "VAL1_NEW"; -private Time time = new MockTime(); -private KafkaBasedLog store; +private final Time time = new MockTime(); +private MockedKafkaBasedLog store; @Mock -private Runnable initializer; +private Consumer initializer; @Mock private KafkaProducer producer; -private MockConsumer consumer; -@Mock Review Comment: I'm curious, why was this change required - https://github.com/apache/kafka/pull/14153/commits/2c7d2e380d02d0f7dc77944ca5775ddaa3540457? ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -114,29 +113,49 @@ public class KafkaBasedLogTest { private static final String TP0_VALUE_NEW = "VAL0_NEW"; private static final String TP1_VALUE_NEW = "VAL1_NEW"; -private Time time = new MockTime(); -private KafkaBasedLog store; +private final Time time = new MockTime(); +private MockedKafkaBasedLog store; @Mock -private Runnable initializer; +private Consumer initializer; @Mock private KafkaProducer producer; -private MockConsumer consumer; -@Mock private TopicAdmin admin; +private final Supplier topicAdminSupplier = () -> admin; +private MockConsumer consumer; -private Map>> consumedRecords = new HashMap<>(); -private Callback> consumedCallback = (error, record) -> { +private final Map>> consumedRecords = new HashMap<>(); +private final Callback> consumedCallback = (error, record) -> { TopicPartition partition = new TopicPartition(record.topic(), record.partition()); List> records = consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>()); records.add(record); }; -@SuppressWarnings("unchecked") +private class MockedKafkaBasedLog extends KafkaBasedLog { +public MockedKafkaBasedLog(String topic, + Map producerConfigs, + Map consumerConfigs, + Supplier topicAdminSupplier, + Callback> consumedCallback, + Time time, + Consumer initializer) { +super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, consumedCallback, time, initializer); +} + +@Override +protected KafkaProducer createProducer() { +return producer; +} + +@Override +protected MockConsumer createConsumer() { +return consumer; +} +} Review Comment: Can we use an anonymous inner class in the `setUp` method instead? I think that'll look a lot cleaner. ## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ## @@ -376,22 +358,11 @@ public void testPollConsumerError() throws Exception { store.stop(); -assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); -assertTrue(consumer.closed()); -PowerMock.verifyAll(); +verifyStartAndStop(); } @Test public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { -expectStart(); - -// Producer flushes when read to log end is called Review Comment: Same comment as above (https://github.com/apache/kafka/pull/14153/files#r1286903819) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15314) No Quota applied if client-id is null or empty
Jorge Esteban Quilcate Otoya created KAFKA-15314: Summary: No Quota applied if client-id is null or empty Key: KAFKA-15314 URL: https://issues.apache.org/jira/browse/KAFKA-15314 Project: Kafka Issue Type: Bug Components: core Reporter: Jorge Esteban Quilcate Otoya When Quotas where proposed, KIP-13[1] stated: > In addition, there will be a quota reserved for clients not presenting a >client id (for e.g. simple consumers not setting the id). This will default to >an empty client id ("") and all such clients will share the quota for that >empty id (which should be the default quota). Though, seems that when client-id is null or empty and a default quota for client-id is present, no quota is applied. Even though Java clients set a default value [2][3], the protocol accepts null client-id[4], and other clients implementations could send a null value to by-pass a quota. Related code[5][6] shows that preparing metric pair for quotas with client-id (potentially null) and setting quota to null when both client-id and (sanitize) user are null. Adding some tests to showcase this: [https://github.com/apache/kafka/pull/14165] Is it expected for client-id=null to by-pass quotas? If it is, then KIP or documentation to clarify this; otherwise we should amend this behavior bug. e.g we could "sanitize" client-id similar to user name to be empty string when input is null or empty. As a side-note, similar behavior could happen with user I guess. Even though value is default to ANONYMOUS, if a client implementation sends empty value, it may as well by-pass the default quota – though I need to further test this once this is considered a bug. [1]: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas] [2]: [https://github.com/apache/kafka/blob/e98508747acc8972ac5ceb921e0fd3a7d7bd5e9c/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java#L498-L508] [3]: [https://github.com/apache/kafka/blob/ab71c56973518bac8e1868eccdc40b17d7da35c1/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L616-L628] [4]: [https://github.com/apache/kafka/blob/9f26906fcc2fd095b7d27c504e342b9a8d619b4b/clients/src/main/resources/common/message/RequestHeader.json#L34-L40] [5]: [https://github.com/apache/kafka/blob/322ac86ba282f35373382854cc9e790e4b7fb5fc/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L588-L628] [6]: [https://github.com/apache/kafka/blob/322ac86ba282f35373382854cc9e790e4b7fb5fc/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L651-L652] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on a diff in pull request #14142: KAFKA-7438: Replace EasyMock with Mockito & remove misleading tests in SessionSto…
yashmayya commented on code in PR #14142: URL: https://github.com/apache/kafka/pull/14142#discussion_r1287021377 ## streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java: ## @@ -118,48 +114,28 @@ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { } @Test -public void shouldThrowNullPointerIfInnerIsNull() { +public void shouldThrowNullPointerIfSupplierIsNull() { final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime())); assertThat(e.getMessage(), equalTo("storeSupplier cannot be null")); } @Test -public void shouldThrowNullPointerIfKeySerdeIsNull() { -final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime())); -assertThat(e.getMessage(), equalTo("name cannot be null")); -} - -@Test -public void shouldThrowNullPointerIfValueSerdeIsNull() { -final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime())); Review Comment: Ah, this looks like the same issue as https://github.com/apache/kafka/pull/14152#discussion_r1284245654 ## streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java: ## @@ -118,48 +114,28 @@ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { } @Test -public void shouldThrowNullPointerIfInnerIsNull() { +public void shouldThrowNullPointerIfSupplierIsNull() { Review Comment: ```suggestion public void shouldThrowNullPointerIfStoreSupplierIsNull() { ``` nit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org