[jira] [Resolved] (KAFKA-15881) Make changes in Release Process Wiki and Release Process
[ https://issues.apache.org/jira/browse/KAFKA-15881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vedarth Sharma resolved KAFKA-15881. Fix Version/s: 3.7.0 Reviewer: Manikumar Resolution: Fixed Release Process is updated > Make changes in Release Process Wiki and Release Process > > > Key: KAFKA-15881 > URL: https://issues.apache.org/jira/browse/KAFKA-15881 > Project: Kafka > Issue Type: Sub-task >Reporter: Vedarth Sharma >Assignee: Vedarth Sharma >Priority: Major > Fix For: 3.7.0 > > > Make changes to Release Process Wiki and docker README.md for detailed > release process instructions -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15775: New consumer listTopics and partitionsFor [kafka]
lucasbru commented on PR #14962: URL: https://github.com/apache/kafka/pull/14962#issuecomment-1853418387 @AndrewJSchofield Please have a look at my comments above, and if you think it's worth fixing, please open a follow up PR. I merged the change since I don't think the feature needs to be blocked on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15775: New consumer listTopics and partitionsFor [kafka]
lucasbru merged PR #14962: URL: https://github.com/apache/kafka/pull/14962 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424943646 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } +@Test +public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException { +MockTimer timer = new MockTimer(); +// The partition writer only accept on write. +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Loads the coordinator. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0, ctx.coordinator.lastWrittenOffset()); +assertEquals(0, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Write #1. We should get a TimeoutException because the HWM will not advance. +CompletableFuture timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3), +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + +timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 1); Review Comment: We should use `3` here to be consistent with `Duration.ofMillis(3)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]
yashmayya commented on code in PR #14966: URL: https://github.com/apache/kafka/pull/14966#discussion_r1424910127 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest { private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; -private static final String CONNECTOR_NAME = "test-connector"; -private static final String TOPIC = "test-topic"; private static final int NUM_TASKS = 2; private static final int NUM_RECORDS_PER_PARTITION = 10; -private Map workerProps; -private EmbeddedConnectCluster.Builder connectBuilder; +private static final Map, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); +@Rule +public TestName currentTest = new TestName(); private EmbeddedConnectCluster connect; +private String connectorName; +private String topic; @Before public void setup() { -Properties brokerProps = new Properties(); -brokerProps.put("transaction.state.log.replication.factor", "1"); -brokerProps.put("transaction.state.log.min.isr", "1"); - -// setup Connect worker properties -workerProps = new HashMap<>(); -workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); - -// build a Connect cluster backed by Kafka and Zk -connectBuilder = new EmbeddedConnectCluster.Builder() -.name("connect-cluster") -.numWorkers(NUM_WORKERS) -.brokerProps(brokerProps) -.workerProps(workerProps); +connectorName = currentTest.getMethodName(); +topic = currentTest.getMethodName(); +connect = defaultConnectCluster(); } @After public void tearDown() { -connect.stop(); +Set remainingConnectors = new HashSet<>(connect.connectors()); +if (remainingConnectors.remove(connectorName)) { +connect.deleteConnector(connectorName); +} +try { +assertEquals( +"Some connectors were not properly cleaned up after this test", +Collections.emptySet(), +remainingConnectors +); +} finally { +// Make a last-ditch effort to clean up the leaked connectors +// so as not to interfere with other test cases +remainingConnectors.forEach(connect::deleteConnector); +} +} + +@AfterClass +public static void close() { +// stop all Connect, Kafka and Zk threads. +CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); +} + +private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map workerProps) { +return CONNECT_CLUSTERS.computeIfAbsent(workerProps, props -> { +Properties brokerProps = new Properties(); +brokerProps.put("transaction.state.log.replication.factor", "1"); +brokerProps.put("transaction.state.log.min.isr", "1"); + +// Have to declare a new map since the passed-in one may be immutable +Map workerPropsWithDefaults = new HashMap<>(workerProps); +// Enable fast offset commits by default + workerPropsWithDefaults.putIfAbsent(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); + +EmbeddedConnectCluster result = new EmbeddedConnectCluster.Builder() +.name("connect-cluster") +.numWorkers(NUM_WORKERS) +.brokerProps(brokerProps) +.workerProps(workerPropsWithDefaults) +.build(); + +result.start(); + +return result; +}); +} + +private static EmbeddedConnectCluster defaultConnectCluster() { +return createOrReuseConnectWithWorkerProps(Collections.emptyMap()); +} + +private static EmbeddedConnectCluster exactlyOnceSourceConnectCluster() { +Map workerProps = Collections.singletonMap( +DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, +"enabled" +); +return createOrReuseConnectWithWorkerProps(workerProps); } @Test public void testGetNonExistentConnectorOffsets() { -connect = connectBuilder.build(); -connect.start(); ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.connectorOffsets("non-existent-connector")); assertEquals(404, e.errorCode()); } @Test public void testGetSinkConnectorOffsets() throws Exception { -connect = connectBuilder.build(); -connect.start();
Re: [PR] MINOR: Rename and update test files for docker image [kafka]
omkreddy merged PR #14991: URL: https://github.com/apache/kafka/pull/14991 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
vamossagar12 commented on PR #14981: URL: https://github.com/apache/kafka/pull/14981#issuecomment-1853271332 Thanks for the review @dajac . I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()
[ https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran reassigned KAFKA-15558: -- Assignee: Phuc Hong Tran > Determine if Timer should be used elsewhere in > PrototypeAsyncConsumer.updateFetchPositions() > > > Key: KAFKA-15558 > URL: https://issues.apache.org/jira/browse/KAFKA-15558 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > This is a followup ticket based on a question from [~junrao] when reviewing > the [fetch request manager pull > request|https://github.com/apache/kafka/pull/14406]: > {quote}It still seems weird that we only use the timer for > {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we > don't have valid fetch positions. For example, if all partitions are in > {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} > will just go in a busy loop, which is not efficient. > {quote} > The goal here is to determine if we should also be propagating the Timer to > the validate positions and reset positions operations. > Note: we should also investigate if the existing {{KafkaConsumer}} > implementation should be fixed, too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs [kafka]
soarez commented on PR #14998: URL: https://github.com/apache/kafka/pull/14998#issuecomment-1853247402 > 36 tests have failed There are 0 new tests failing, 36 existing failing and 271 skipped. @rondagostino PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15883: Implement RemoteCopyLagBytes [kafka]
showuon commented on PR #14832: URL: https://github.com/apache/kafka/pull/14832#issuecomment-1853214842 @clolov , I took the liberty to help fix the checksytle error. Let's see if the CI build passed. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Upgrade jqwik to version 1.8.0 [kafka]
github-actions[bot] commented on PR #14365: URL: https://github.com/apache/kafka/pull/14365#issuecomment-1853213017 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Minor : Increase Config change throwable log info to error [kafka]
github-actions[bot] commented on PR #14380: URL: https://github.com/apache/kafka/pull/14380#issuecomment-1853212990 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15883: Implement RemoteCopyLagBytes [kafka]
showuon commented on PR #14832: URL: https://github.com/apache/kafka/pull/14832#issuecomment-1853210629 @clolov , there are checkstyle error, please correct them: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14832/6/pipeline Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15981: update Group size only when groups size changes [kafka]
jeffkbkim commented on PR #14988: URL: https://github.com/apache/kafka/pull/14988#issuecomment-1853155239 The previous build had the error from test `testNoGroupEpochBumpWhenStaticMemberTemporarilyLeaves() – org.apache.kafka.coordinator.group.GroupMetadataManagerTest` from jdk 8 ``` expected: <[Record(key=ApiMessageAndVersion(ConsumerGroupCurrentMemberAssignmentKey(groupId='fooup', memberId='_hAb4Z1NSHG2Xp42s7KO9Q') at version 8), value=ApiMessageAndVersion(ConsumerGroupCurrentMemberAssignmentValue(memberEpoch=-2, previousMemberEpoch=9, targetMemberEpoch=10, assignedPartitions=[TopicPartitions(topicId=YX9w07JjSp2Yw1ra6Bfkaw, partitions=[2]), TopicPartitions(topicId=brQVJtH8RsyF4BayziKeBQ, partitions=[3, 4, 5])], partitionsPendingRevocation=[], partitionsPendingAssignment=[], error=0, metadataVersion=0, metadataBytes=[]) at version 0))]> but was: <[Record(key=ApiMessageAndVersion(ConsumerGroupCurrentMemberAssignmentKey(groupId='fooup', memberId='_hAb4Z1NSHG2Xp42s7KO9Q') at version 8), value=ApiMessageAndVersion(ConsumerGroupCurrentMemberAssignmentValue(memberEpoch=-2, previousMemberEpoch=9, targetMemberEpoch=10, assignedPartitions=[TopicPartitions(topicId=brQVJtH8RsyF4BayziKeBQ, partitions=[3, 4, 5]), TopicPartitions(topicId=YX9w07JjSp2Yw1ra6Bfkaw, partitions= [2])], partitionsPendingRevocation=[], partitionsPendingAssignment=[], error=0, metadataVersion=0, metadataBytes=[]) at version 0))]> ``` Where the diff is the ordering between the two `TopicPartitions` in `assignedPartitions` field. I don't think my code touched this. Was this already flaky? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: docs for rack aware assignment [kafka]
lihaosky opened a new pull request, #14999: URL: https://github.com/apache/kafka/pull/14999 Docs for the new `balance_subtopology` config. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15366: Modify LogDirFailureTest for KRaft [kafka]
soarez commented on code in PR #14977: URL: https://github.com/apache/kafka/pull/14977#discussion_r1424698314 ## core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala: ## @@ -191,12 +204,27 @@ class LogDirFailureTest extends IntegrationTestHarness { TestUtils.pollUntilAtLeastNumRecords(consumer, 1) // There should be no remaining LogDirEventNotification znode -assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty) +if (quorum == "zk") { + assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty) +} -// The controller should have marked the replica on the original leader as offline -val controllerServer = servers.find(_.kafkaController.isActive).get -val offlineReplicas = controllerServer.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) -assertTrue(offlineReplicas.contains(PartitionAndReplica(new TopicPartition(topic, 0), leaderServerId))) +if (quorum == "kraft") { + waitUntilTrue(() => { +brokers.exists(broker => { + val hasOfflineDir = broker.asInstanceOf[BrokerServer].logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString) + hasOfflineDir && broker.asInstanceOf[BrokerServer] +.replicaManager +.metadataCache +.getClusterMetadata(broker.clusterId, broker.config.interBrokerListenerName) +.partition(new TopicPartition(topic, 0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId) Review Comment: Hey @viktorsomogyi, at least one of the issues here was this -> #14998 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs [kafka]
soarez opened a new pull request, #14998: URL: https://github.com/apache/kafka/pull/14998 Any directory changes must be considered when replaying BrokerRegistrationChangeRecord. This is necessary to persist directory failures in the cluster metadata, which #14902 missed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15111) Correction kafka examples
[ https://issues.apache.org/jira/browse/KAFKA-15111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-15111. - Resolution: Duplicate > Correction kafka examples > - > > Key: KAFKA-15111 > URL: https://issues.apache.org/jira/browse/KAFKA-15111 > Project: Kafka > Issue Type: Task >Reporter: Dmitry >Priority: Minor > Fix For: 3.6.0 > > > Need set TOPIC_NAME = topic1 in KafkaConsumerProducerDemo class and remove > unused TOPIC field from KafkaProperties. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15111) Correction kafka examples
[ https://issues.apache.org/jira/browse/KAFKA-15111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15111: Fix Version/s: 3.6.0 (was: 3.7.0) > Correction kafka examples > - > > Key: KAFKA-15111 > URL: https://issues.apache.org/jira/browse/KAFKA-15111 > Project: Kafka > Issue Type: Task >Reporter: Dmitry >Priority: Minor > Fix For: 3.6.0 > > > Need set TOPIC_NAME = topic1 in KafkaConsumerProducerDemo class and remove > unused TOPIC field from KafkaProperties. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently
[ https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795958#comment-17795958 ] Greg Harris commented on KAFKA-15372: - This is now set to release for 3.7 and 3.6, but I had some issues with the 3.5 backport that I had to revert. In particular, the DedicatedMirrorTest has this persistent failure: {noformat} org.apache.kafka.test.NoRetryException at app//org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.lambda$awaitTaskConfigurations$8(DedicatedMirrorIntegrationTest.java:363) at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308) at app//org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.awaitTaskConfigurations(DedicatedMirrorIntegrationTest.java:353) at app//org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster(DedicatedMirrorIntegrationTest.java:301) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.distributed.RebalanceNeededException: Request cannot be completed because a rebalance is expected at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:123) at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:115) at org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.lambda$awaitTaskConfigurations$8(DedicatedMirrorIntegrationTest.java:357) ... 7 more Caused by: org.apache.kafka.connect.runtime.distributed.RebalanceNeededException: Request cannot be completed because a rebalance is expected{noformat} > MM2 rolling restart can drop configuration changes silently > --- > > Key: KAFKA-15372 > URL: https://issues.apache.org/jira/browse/KAFKA-15372 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Greg Harris >Priority: Major > Fix For: 3.7.0, 3.6.2 > > > When MM2 is restarted, it tries to update the Connector configuration in all > flows. This is a one-time trial, and fails if the Connect worker is not the > leader of the group. > In a distributed setup and with a rolling restart, it is possible that for a > specific flow, the Connect worker of the just restarted MM2 instance is not > the leader, meaning that Connector configurations can get dropped. > For example, assuming 2 MM2 instances, and one flow A->B: > # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the > leader of A->B Connect group. > # MM2 instance 1 tries to update the Connector configurations, but fails > (instance 2 has the leader, not instance 1) > # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1 > # MM2 instance 2 tries to update the Connector configurations, but fails > At this point, the configuration changes before the restart are never > applied. Many times, this can also happen silently, without any indication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently
[ https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15372: Fix Version/s: 3.6.2 > MM2 rolling restart can drop configuration changes silently > --- > > Key: KAFKA-15372 > URL: https://issues.apache.org/jira/browse/KAFKA-15372 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Greg Harris >Priority: Major > Fix For: 3.7.0, 3.6.2 > > > When MM2 is restarted, it tries to update the Connector configuration in all > flows. This is a one-time trial, and fails if the Connect worker is not the > leader of the group. > In a distributed setup and with a rolling restart, it is possible that for a > specific flow, the Connect worker of the just restarted MM2 instance is not > the leader, meaning that Connector configurations can get dropped. > For example, assuming 2 MM2 instances, and one flow A->B: > # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the > leader of A->B Connect group. > # MM2 instance 1 tries to update the Connector configurations, but fails > (instance 2 has the leader, not instance 1) > # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1 > # MM2 instance 2 tries to update the Connector configurations, but fails > At this point, the configuration changes before the restart are never > applied. Many times, this can also happen silently, without any indication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Give FETCH request own MetadataVersion [kafka]
jolshan opened a new pull request, #14997: URL: https://github.com/apache/kafka/pull/14997 https://github.com/apache/kafka/commit/c8f687ac1505456cb568de2b60df235eb1ceb5f0 was incorrect in reusing the same metadata version to enable the new fetch request. This fixes the issue by giving the version bump its own metadata version. Note -- this is a no-op in running code. The fetch request to replica fetchers will be the same semantically. We just wanted to fix upgrades with MV 3_7_IV0 that could encounter unknown fetch versions when upgrading from a version that didn't contain this change to one that did. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]
gharris1727 merged PR #14293: URL: https://github.com/apache/kafka/pull/14293 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]
gharris1727 commented on PR #14293: URL: https://github.com/apache/kafka/pull/14293#issuecomment-1852872688 Test failures appear unrelated, and the mirror and runtime tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
hachikuji commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1424609415 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -1352,6 +1360,28 @@ object GroupMetadataManager { "%X".format(BigInt(1, bytes)) } + def maybeConvertError(error: Errors) : Errors = { Review Comment: nit: can we choose a more descriptive name? Perhaps include `OffsetCommit` somewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1424609554 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java: ## @@ -102,6 +103,14 @@ public long append( } } +@Override +public long completeTransaction( +TopicPartition tp, +WriteTxnMarkersRequest.TxnMarkerEntry marker +) throws KafkaException { +throw new IllegalStateException("Not implemented"); Review Comment: What is this class used for again? And is this planned in a future PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1424597195 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -135,7 +136,22 @@ class CoordinatorLoaderImpl[T]( memoryRecords.batches.forEach { batch => if (batch.isControlBatch) { -throw new IllegalStateException("Control batches are not supported yet.") +batch.asScala.foreach { record => Review Comment: Is it always the case that we need to complete the transaction on loading? Is this because loading rebuilds memory from the log so we play through the whole transaction again? Maybe I'm just getting confused by the "completeTransaction" method names as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424589872 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -285,44 +286,6 @@ void testEnsureEventsAreCompleted() { assertTrue(applicationEventsQueue.isEmpty()); } -@Test Review Comment: removed because they are irrelevant now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424585777 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -810,89 +854,87 @@ public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId final Properties props = requiredConsumerPropertiesAndGroupId(groupId); props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); final ConsumerConfig config = new ConsumerConfig(props); -try (final AsyncKafkaConsumer consumer = -new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - -final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - -assertEquals(groupId, groupMetadata.groupId()); -assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); -assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); -assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); -} +final AsyncKafkaConsumer consumer = +new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()); +final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); +assertEquals(groupId, groupMetadata.groupId()); +assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); +assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); +assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); +consumer.close(Duration.ZERO); } @Test public void testGroupMetadataUpdateSingleCall() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); -try (final AsyncKafkaConsumer consumer = Review Comment: ok makes sense, I can revert these changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15471 [MINOR]: Fix backward-compatibility bug [kafka]
hni61223 commented on code in PR #14996: URL: https://github.com/apache/kafka/pull/14996#discussion_r1424548302 ## bin/kafka-server-stop.sh: ## @@ -36,7 +36,7 @@ else declare -a AbsolutePathToConfigArray for ((i = 0; i < ${#RelativePathArray[@]}; i++)); do AbsolutePathToConfig=$(readlink -f "${RelativePathArray[i]}") -if [ -z "$AbsolutePathToConfig" ]; then +if [ -z "$AbsolutePathToConfig" ] && [ -n "$INPUT_PROCESS_ROLE" ] || [ -n "$INPUT_NID" ]; then Review Comment: yeah that will work. Let me change that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15471 [MINOR]: Fix backward-compatibility bug [kafka]
rondagostino commented on code in PR #14996: URL: https://github.com/apache/kafka/pull/14996#discussion_r1424547476 ## bin/kafka-server-stop.sh: ## @@ -36,7 +36,7 @@ else declare -a AbsolutePathToConfigArray for ((i = 0; i < ${#RelativePathArray[@]}; i++)); do AbsolutePathToConfig=$(readlink -f "${RelativePathArray[i]}") -if [ -z "$AbsolutePathToConfig" ]; then +if [ -z "$AbsolutePathToConfig" ] && [ -n "$INPUT_PROCESS_ROLE" ] || [ -n "$INPUT_NID" ]; then Review Comment: It might actually be best to rework things a bit to move the whole "AbsolutePathToConfigArray " section down until after line 53. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15471 [MINOR]: Fix backward-compatibility bug [kafka]
hni61223 commented on code in PR #14996: URL: https://github.com/apache/kafka/pull/14996#discussion_r1424547291 ## bin/kafka-server-stop.sh: ## @@ -36,7 +36,7 @@ else declare -a AbsolutePathToConfigArray for ((i = 0; i < ${#RelativePathArray[@]}; i++)); do AbsolutePathToConfig=$(readlink -f "${RelativePathArray[i]}") -if [ -z "$AbsolutePathToConfig" ]; then +if [ -z "$AbsolutePathToConfig" ] && [ -n "$INPUT_PROCESS_ROLE" ] || [ -n "$INPUT_NID" ]; then Review Comment: You are right. Let me fix this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15912) Parallelize conversion and transformation steps in Connect
[ https://issues.apache.org/jira/browse/KAFKA-15912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795926#comment-17795926 ] Vojtech Juranek commented on KAFKA-15912: - I'd be careful to do the parallelization per SMT/converter as moving data between threads maybe be in result more expensive, as you already mentioned. Also, if there is some bottleneck, e.g. value converter, running it in a single thread won't give any significant speed up. And this is actually what we (Debezium project) observe, either in our perf. tests or reported by the users (e.g. KAFKA-15996, resp. [DBZ-7240|https://issues.redhat.com/browse/DBZ-7240]). It would be IMHO more useful, if possible, to form record pipelines and run in parallel records through these pipelines in parallel. With this approach, thread safety can be solved e.g. by creating SMT/convertors copies for each processing pipeline. The issue with stateful transformation however remains. Also there is an issue with records ordering, however quite easily solvable when processing is done in batches. I'm currently doing some experiments with Debezium server, if such pipelines are possible there and if it gives any significant performance boost (still WIP, no results yet). So I'm wondering if doing something similar for Kafka Connect make sense for you or this seems to be too much complicated to worth the effort/possible backward compatibility issues/etc? > Parallelize conversion and transformation steps in Connect > -- > > Key: KAFKA-15912 > URL: https://issues.apache.org/jira/browse/KAFKA-15912 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Mickael Maison >Priority: Major > > In busy Connect pipelines, the conversion and transformation steps can > sometimes have a very significant impact on performance. This is especially > true with large records with complex schemas, for example with CDC connectors > like Debezium. > Today in order to always preserve ordering, converters and transformations > are called on one record at a time in a single thread in the Connect worker. > As Connect usually handles records in batches (up to max.poll.records in sink > pipelines, for source pipelines while it really depends on the connector, > most connectors I've seen still tend to return multiple records each loop), > it could be highly beneficial to attempt running the converters and > transformation chain in parallel by a pool a processing threads. > It should be possible to do some of these steps in parallel and still keep > exact ordering. I'm even considering whether an option to lose ordering but > allow even faster processing would make sense. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15471 [MINOR]: Fix backward-compatibility bug [kafka]
rondagostino commented on code in PR #14996: URL: https://github.com/apache/kafka/pull/14996#discussion_r1424545171 ## bin/kafka-server-stop.sh: ## @@ -36,7 +36,7 @@ else declare -a AbsolutePathToConfigArray for ((i = 0; i < ${#RelativePathArray[@]}; i++)); do AbsolutePathToConfig=$(readlink -f "${RelativePathArray[i]}") -if [ -z "$AbsolutePathToConfig" ]; then +if [ -z "$AbsolutePathToConfig" ] && [ -n "$INPUT_PROCESS_ROLE" ] || [ -n "$INPUT_NID" ]; then Review Comment: Is there a precedence issue here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15471 [MINOR]: Fix backward-compatibility bug [kafka]
hni61223 opened a new pull request, #14996: URL: https://github.com/apache/kafka/pull/14996 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1424518210 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -892,6 +895,43 @@ public void replay( } } +/** + * Applies the given transaction marker. + * + * @param producerIdThe producer id. + * @param resultThe result of the transaction. + * @throws RuntimeException if the transaction can not be completed. + */ +public void completeTransaction( +long producerId, +TransactionResult result +) throws RuntimeException { +Offsets pendingOffsets = pendingTransactionalOffsets.remove(producerId); + +if (result == TransactionResult.COMMIT) { +log.debug("Committed transactional offset commits for producer id {}.", producerId); +if (pendingOffsets == null) return; + +pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { Review Comment: is there a reason why we need to store all the offsets in these nested maps? If we are just going to commit all the data anyway? I was wondering if the same would be accomplished if we had a list of all the info for each producer id. I guess it is the same complexity and we can reuse the object. I was just curious if we ever needed access for a random specific partition for a given producer ID. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16001) Migrate ConsumerNetworkThreadTestBuilder away from ConsumerTestBuilder
Lucas Brutschy created KAFKA-16001: -- Summary: Migrate ConsumerNetworkThreadTestBuilder away from ConsumerTestBuilder Key: KAFKA-16001 URL: https://issues.apache.org/jira/browse/KAFKA-16001 Project: Kafka Issue Type: Sub-task Reporter: Lucas Brutschy -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1424518210 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -892,6 +895,43 @@ public void replay( } } +/** + * Applies the given transaction marker. + * + * @param producerIdThe producer id. + * @param resultThe result of the transaction. + * @throws RuntimeException if the transaction can not be completed. + */ +public void completeTransaction( +long producerId, +TransactionResult result +) throws RuntimeException { +Offsets pendingOffsets = pendingTransactionalOffsets.remove(producerId); + +if (result == TransactionResult.COMMIT) { +log.debug("Committed transactional offset commits for producer id {}.", producerId); +if (pendingOffsets == null) return; + +pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { Review Comment: is there a reason why we need to store all the offsets in these nested maps? If we are just going to commit all the data anyway? I was wondering if the same would be accomplished if we had a list of all the info for each producer id. I guess it is the same complexity. I was just curious if we ever needed access for a random specific partition for a given producer ID. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16000) Migrate MembershipManagerImpl away from ConsumerTestBuilder
Lucas Brutschy created KAFKA-16000: -- Summary: Migrate MembershipManagerImpl away from ConsumerTestBuilder Key: KAFKA-16000 URL: https://issues.apache.org/jira/browse/KAFKA-16000 Project: Kafka Issue Type: Sub-task Reporter: Lucas Brutschy -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15999) Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder
Lucas Brutschy created KAFKA-15999: -- Summary: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder Key: KAFKA-15999 URL: https://issues.apache.org/jira/browse/KAFKA-15999 Project: Kafka Issue Type: Sub-task Reporter: Lucas Brutschy -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15913) Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795918#comment-17795918 ] Lucas Brutschy commented on KAFKA-15913: Splitting off the other tests > Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder > > > Key: KAFKA-15913 > URL: https://issues.apache.org/jira/browse/KAFKA-15913 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor > > ConsumerTestBuilder is meant to be an unit testing utility; however, we seem > to use Mockito#spy quite liberally. This is not the right testing strategy > because we basically turn unit testing into integration testing. > > While the current unit tests run fine, we should probably make the mocking > using Mockito#mock by default and test each dependency independently. > > The ask here is > # Make mock(class) by default > # Provide more flexible interface for the testBuilder to allow user to > configure spy or mock. Or, let user pass in their own mock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15913) Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15913: -- Assignee: Lucas Brutschy Summary: Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder (was: Remove excessive use of spy in ConsumerTestBuilder) > Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder > > > Key: KAFKA-15913 > URL: https://issues.apache.org/jira/browse/KAFKA-15913 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor > > ConsumerTestBuilder is meant to be an unit testing utility; however, we seem > to use Mockito#spy quite liberally. This is not the right testing strategy > because we basically turn unit testing into integration testing. > > While the current unit tests run fine, we should probably make the mocking > using Mockito#mock by default and test each dependency independently. > > The ask here is > # Make mock(class) by default > # Provide more flexible interface for the testBuilder to allow user to > configure spy or mock. Or, let user pass in their own mock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1424512647 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -862,6 +863,7 @@ public void replay( } if (producerId == RecordBatch.NO_PRODUCER_ID) { +log.debug("Replaying offset commit with producer id {}, key {}, value {}", producerId, key, value); Review Comment: Do we care to print out the producer ID if it is "no producer id"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15998) EAGER rebalance onPartitionsAssigned() called with no previous onPartitionsLost() nor onPartitionsRevoked()
[ https://issues.apache.org/jira/browse/KAFKA-15998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Haapala updated KAFKA-15998: - Description: I ran into a case where {{onPartitionsAssigned()}} was called without first calling {{onPartitionsRevoked()}} and there is no indication that {{onPartitionsLost()}} was called or had any reason to be called. We are using the *EAGER* rebalance protocol and the *StickyAssignor* on kafka 3.4.0. Our services rely on the API contract that {{{}onPartitionsRevoked(){}}}: {quote}In eager rebalancing, it will always be called at the start of a rebalance and after the consumer stops fetching data. {quote} We internally keep track of partition states with a state machine, and rely on these APIs to assert what expected states we are in. So when a partition is Revoked and then re-Assigned, we know that we kept ownership. Moreover, if we are assigned partitions in EAGER rebalancing, we expect that entire assignment is passed to {{{}onPartitionsAssigned(){}}}, because if {{onPartitionsRevoked()}} is always called at the start of a rebalance and EAGER protocol always revokes the entire assignment, then by the time we hit {{onPartitionsAssigned()}} there should be nothing assigned from the consumer's point of view, and therefore the entire assignment is newly added. However, we recently ran into a situation where we received an assignment while the consumer's existing assignment was non-empty: | *Pod*| *Message*| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Notifying assignor about the {*}new Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, topic-141])| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, topic-141| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:31,923\{UTC} [kafka-coordinator-heartbeat-thread \\| metric-aggregator] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] *Request joining group* due to: group is already rebalancing| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,132\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Successfully joined group with generation Generation\{generationId=12417, memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223', protocol='sticky'}| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,134\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Successfully synced group in generation Generation{generationId={*}12417{*}, memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223', protocol='sticky'}| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Notifying assignor about the {*}new Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117])| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: | Here you can see we get assigned partitions: 26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117, 123, 130, 137, 141 And promptly see them all as newly added when passed to {{{}onPartitionsAssigned(){}}}. 6 seconds later the heartbeat thread notices another rebalance and requests to join. It quickly succeeds and then almost immediately successfully syncs. We then get a new assignment: 26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117 This is a subset of the partitions we were assigned previously, missing 123, 130, 137, and 141. Because {{onPartitionsRevoked()}} was not called at the beginning of this rebalance, the consumer still has the old assignment as its current
[jira] [Created] (KAFKA-15998) EAGER rebalance onPartitionsAssigned() called with no previous onPartitionsLost() nor onPartitionsRevoked()
Jonathan Haapala created KAFKA-15998: Summary: EAGER rebalance onPartitionsAssigned() called with no previous onPartitionsLost() nor onPartitionsRevoked() Key: KAFKA-15998 URL: https://issues.apache.org/jira/browse/KAFKA-15998 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.4.0 Reporter: Jonathan Haapala I ran into a case where {{onPartitionsAssigned()}} was called without first calling {{onPartitionsRevoked()}} and there is no indication that {{onPartitionsLost()}} was called or had any reason to be called. We are using the *EAGER* rebalance protocol and the *StickyAssignor* on kafka 3.4.0. Our services rely on the API contract that {{{}onPartitionsRevoked(){}}}: {quote}In eager rebalancing, it will always be called at the start of a rebalance and after the consumer stops fetching data.{quote} We internally keep track of partition states with a state machine, and rely on these APIs to assert what expected states we are in. So when a partition is Revoked and then re-Assigned, we know that we kept ownership. Moreover, if we are assigned partitions in EAGER rebalancing, we expect that entire assignment is passed to {{{}onPartitionsAssigned(){}}}, because if {{onPartitionsRevoked()}} is always called at the start of a rebalance and EAGER protocol always revokes the entire assignment, then by the time we hit {{onPartitionsAssigned()}} there should be nothing assigned from the consumer's point of view, and therefore the entire assignment is newly added. However, we recently ran into a situation where we received an assignment while the consumer's existing assignment was non-empty: | *Pod* | *Message* | |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Notifying assignor about the {*}new Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, topic-141])| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, topic-141| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:31,923\{UTC} [kafka-coordinator-heartbeat-thread \| metric-aggregator] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] *Request joining group* due to: group is already rebalancing| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,132\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Successfully joined group with generation Generation\{generationId=12417, memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223', protocol='sticky'}| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,134\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Successfully synced group in generation Generation{generationId={*}12417{*}, memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223', protocol='sticky'}| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Notifying assignor about the \{*}new Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117])| |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} [KafkaConsumerAutoService] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer.metric-data-points.metric-aggregator, groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: | Here you can see we get assigned partitions: 26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117, 123, 130, 137, 141 And promptly see them all as newly added when passed to {{{}onPartitionsAssigned(){}}}. 6 seconds later the heartbeat thread notices another rebalance and requests to join. It quickly succeeds and then almost immediately successfully syncs. We then get a new assignment: 26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113,
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
hachikuji commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1424500490 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -1352,6 +1360,28 @@ object GroupMetadataManager { "%X".format(BigInt(1, bytes)) } + def maybeConvertError(error: Errors) : Errors = { +error match { + case Errors.UNKNOWN_TOPIC_OR_PARTITION + | Errors.NOT_ENOUGH_REPLICAS + | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => +Errors.COORDINATOR_NOT_AVAILABLE + + case Errors.NOT_LEADER_OR_FOLLOWER + | Errors.KAFKA_STORAGE_ERROR => +Errors.NOT_COORDINATOR + + case Errors.MESSAGE_TOO_LARGE + | Errors.RECORD_LIST_TOO_LARGE + | Errors.INVALID_FETCH_SIZE => +Errors.INVALID_COMMIT_OFFSET_SIZE Review Comment: Right. I would think UNKNOWN_SERVER_ERROR or something like that would be more appropriate for a case like this. It doesn't say anything about the size of the offset commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
jolshan commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1424497060 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -1352,6 +1360,28 @@ object GroupMetadataManager { "%X".format(BigInt(1, bytes)) } + def maybeConvertError(error: Errors) : Errors = { +error match { + case Errors.UNKNOWN_TOPIC_OR_PARTITION + | Errors.NOT_ENOUGH_REPLICAS + | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => +Errors.COORDINATOR_NOT_AVAILABLE + + case Errors.NOT_LEADER_OR_FOLLOWER + | Errors.KAFKA_STORAGE_ERROR => +Errors.NOT_COORDINATOR + + case Errors.MESSAGE_TOO_LARGE + | Errors.RECORD_LIST_TOO_LARGE + | Errors.INVALID_FETCH_SIZE => +Errors.INVALID_COMMIT_OFFSET_SIZE Review Comment: I think one is a known error to the group coordinator and the other isn't? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424473429 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -135,6 +138,11 @@ public Builder withTimer(Timer timer) { return this; } +public Builder withDefaultWriteTimeOut(Duration timeout) { Review Comment: nit: `timeout` -> `defaultWriteTimeout`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -595,13 +609,15 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { /** * Constructor. * - * @param name The operation name. - * @param tpThe topic partition that the operation is applied to. - * @param opThe write operation. + * @param name The operation name. + * @param tpThe topic partition that the operation is applied to. + * @param defaultWriteTimeout The default write operation timeout + * @param opThe write operation. */ CoordinatorWriteEvent( String name, TopicPartition tp, +Duration defaultWriteTimeout, Review Comment: ditto. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -581,6 +590,11 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture future; +/** + * Timeout value for the write operation + */ +final Duration defaultWriteTimeout; Review Comment: nit: This one should actually be `writeTimeout`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -581,6 +590,11 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture future; +/** + * Timeout value for the write operation Review Comment: nit: `.`. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } +@Test +public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException { +MockTimer timer = new MockTimer(); +// The partition writer only accept on write. +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Loads the coordinator. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0, ctx.coordinator.lastWrittenOffset()); +assertEquals(0, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Write #1. We should get a TimeoutException because the HWM will not advance. +CompletableFuture timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + +timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() * 2); Review Comment: nit: I would just do `timeout + 1` if possible. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -610,26 +626,29 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { null, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, +defaultWriteTimeout, op ); } /** * Constructor. * - * @param name The operation name. - * @param tpThe topic partition that the operation is applied to. - * @param
Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]
hachikuji commented on code in PR #14774: URL: https://github.com/apache/kafka/pull/14774#discussion_r1424481055 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -1352,6 +1360,28 @@ object GroupMetadataManager { "%X".format(BigInt(1, bytes)) } + def maybeConvertError(error: Errors) : Errors = { +error match { + case Errors.UNKNOWN_TOPIC_OR_PARTITION + | Errors.NOT_ENOUGH_REPLICAS + | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => +Errors.COORDINATOR_NOT_AVAILABLE + + case Errors.NOT_LEADER_OR_FOLLOWER + | Errors.KAFKA_STORAGE_ERROR => +Errors.NOT_COORDINATOR + + case Errors.MESSAGE_TOO_LARGE + | Errors.RECORD_LIST_TOO_LARGE + | Errors.INVALID_FETCH_SIZE => +Errors.INVALID_COMMIT_OFFSET_SIZE Review Comment: We don't have to address here, but this error list handling seems dubious. Not sure why `INVALID_FETCH_SIZE` would translate to `INVALID_COMMIT_OFFSET_SIZE`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
tinaselenge opened a new pull request, #14995: URL: https://github.com/apache/kafka/pull/14995 This PR implements [KIP-993](https://cwiki.apache.org/confluence/display/KAFKA/KIP-993%3A+Allow+restricting+files+accessed+by+File+and+Directory+ConfigProviders) for restricting files accessed by File and Directory ConfigProviders. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
gharris1727 commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1424413335 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -350,13 +353,16 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r } private static class Block { Review Comment: can you make this public to allow OffsetsApiIntegrationTest to use the latch? and do you think that maybe these connectors should be moved out of this test to a common reusable class? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); -} +awaitBlockLatch = Block.awaitBlockLatch; +} + +if (awaitBlockLatch == null) { +throw new IllegalArgumentException("No connector has been created yet"); Review Comment: Is this an opportunity for a flaky failure, if the test thread advances before the connector is created. It seems very rare, I don't see any instances on the Gradle dashboard. ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); -} +awaitBlockLatch = Block.awaitBlockLatch; +} + +if (awaitBlockLatch == null) { +throw new IllegalArgumentException("No connector has been created yet"); } log.debug("Waiting for connector to block"); -if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { +if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); Review Comment: Since scanning creates connector instances, and validation caches the connector instance, how do you ensure that the right awaitBlockLatch is being waited on here? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -139,7 +141,8 @@ public void setup() throws Exception { public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); -Block.resetBlockLatch(); +// unblock everything so that we don't leak threads after each test run +Block.reset(); Review Comment: WDYT about resetting before stopping the workers, to allow a normal shutdown to happen? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]
C0urante commented on code in PR #14966: URL: https://github.com/apache/kafka/pull/14966#discussion_r1424453814 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest { private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; -private static final String CONNECTOR_NAME = "test-connector"; -private static final String TOPIC = "test-topic"; private static final int NUM_TASKS = 2; private static final int NUM_RECORDS_PER_PARTITION = 10; -private Map workerProps; -private EmbeddedConnectCluster.Builder connectBuilder; +private static final Map, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); +@Rule +public TestName currentTest = new TestName(); private EmbeddedConnectCluster connect; +private String connectorName; +private String topic; @Before public void setup() { -Properties brokerProps = new Properties(); -brokerProps.put("transaction.state.log.replication.factor", "1"); -brokerProps.put("transaction.state.log.min.isr", "1"); - -// setup Connect worker properties -workerProps = new HashMap<>(); -workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); - -// build a Connect cluster backed by Kafka and Zk -connectBuilder = new EmbeddedConnectCluster.Builder() -.name("connect-cluster") -.numWorkers(NUM_WORKERS) -.brokerProps(brokerProps) -.workerProps(workerProps); +connectorName = currentTest.getMethodName(); +topic = currentTest.getMethodName(); +connect = defaultConnectCluster(); } @After public void tearDown() { -connect.stop(); +Set remainingConnectors = new HashSet<>(connect.connectors()); +if (remainingConnectors.remove(connectorName)) { +connect.deleteConnector(connectorName); +} +try { Review Comment: Ah, got it. Yeah, the idea behind the assertion was to catch any stray connectors that may be left behind by future tests. It's pointless now but it may be a nice guardrail later on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]
sudeshwasnik commented on code in PR #14966: URL: https://github.com/apache/kafka/pull/14966#discussion_r1424446893 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest { private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; -private static final String CONNECTOR_NAME = "test-connector"; -private static final String TOPIC = "test-topic"; private static final int NUM_TASKS = 2; private static final int NUM_RECORDS_PER_PARTITION = 10; -private Map workerProps; -private EmbeddedConnectCluster.Builder connectBuilder; +private static final Map, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); +@Rule +public TestName currentTest = new TestName(); private EmbeddedConnectCluster connect; +private String connectorName; +private String topic; @Before public void setup() { -Properties brokerProps = new Properties(); -brokerProps.put("transaction.state.log.replication.factor", "1"); -brokerProps.put("transaction.state.log.min.isr", "1"); - -// setup Connect worker properties -workerProps = new HashMap<>(); -workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); - -// build a Connect cluster backed by Kafka and Zk -connectBuilder = new EmbeddedConnectCluster.Builder() -.name("connect-cluster") -.numWorkers(NUM_WORKERS) -.brokerProps(brokerProps) -.workerProps(workerProps); +connectorName = currentTest.getMethodName(); +topic = currentTest.getMethodName(); +connect = defaultConnectCluster(); } @After public void tearDown() { -connect.stop(); +Set remainingConnectors = new HashSet<>(connect.connectors()); +if (remainingConnectors.remove(connectorName)) { +connect.deleteConnector(connectorName); +} +try { Review Comment: sounds good ! thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]
sudeshwasnik commented on code in PR #14966: URL: https://github.com/apache/kafka/pull/14966#discussion_r142423 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest { private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; -private static final String CONNECTOR_NAME = "test-connector"; -private static final String TOPIC = "test-topic"; private static final int NUM_TASKS = 2; private static final int NUM_RECORDS_PER_PARTITION = 10; -private Map workerProps; -private EmbeddedConnectCluster.Builder connectBuilder; +private static final Map, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); +@Rule +public TestName currentTest = new TestName(); private EmbeddedConnectCluster connect; +private String connectorName; +private String topic; @Before public void setup() { -Properties brokerProps = new Properties(); -brokerProps.put("transaction.state.log.replication.factor", "1"); -brokerProps.put("transaction.state.log.min.isr", "1"); - -// setup Connect worker properties -workerProps = new HashMap<>(); -workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); - -// build a Connect cluster backed by Kafka and Zk -connectBuilder = new EmbeddedConnectCluster.Builder() -.name("connect-cluster") -.numWorkers(NUM_WORKERS) -.brokerProps(brokerProps) -.workerProps(workerProps); +connectorName = currentTest.getMethodName(); +topic = currentTest.getMethodName(); +connect = defaultConnectCluster(); } @After public void tearDown() { -connect.stop(); +Set remainingConnectors = new HashSet<>(connect.connectors()); +if (remainingConnectors.remove(connectorName)) { +connect.deleteConnector(connectorName); +} +try { Review Comment: line 102 -> `remainingConnectors.remove(connectorName)` will remove elements anyways, so line-106's assertEquals will always be true - so the assertion is pointless. Which I thought wasn't okayy. but your point is valid too! feel free to resolve this comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]
sudeshwasnik commented on code in PR #14966: URL: https://github.com/apache/kafka/pull/14966#discussion_r142423 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest { private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; -private static final String CONNECTOR_NAME = "test-connector"; -private static final String TOPIC = "test-topic"; private static final int NUM_TASKS = 2; private static final int NUM_RECORDS_PER_PARTITION = 10; -private Map workerProps; -private EmbeddedConnectCluster.Builder connectBuilder; +private static final Map, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); +@Rule +public TestName currentTest = new TestName(); private EmbeddedConnectCluster connect; +private String connectorName; +private String topic; @Before public void setup() { -Properties brokerProps = new Properties(); -brokerProps.put("transaction.state.log.replication.factor", "1"); -brokerProps.put("transaction.state.log.min.isr", "1"); - -// setup Connect worker properties -workerProps = new HashMap<>(); -workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); - -// build a Connect cluster backed by Kafka and Zk -connectBuilder = new EmbeddedConnectCluster.Builder() -.name("connect-cluster") -.numWorkers(NUM_WORKERS) -.brokerProps(brokerProps) -.workerProps(workerProps); +connectorName = currentTest.getMethodName(); +topic = currentTest.getMethodName(); +connect = defaultConnectCluster(); } @After public void tearDown() { -connect.stop(); +Set remainingConnectors = new HashSet<>(connect.connectors()); +if (remainingConnectors.remove(connectorName)) { +connect.deleteConnector(connectorName); +} +try { Review Comment: line 102 -> `remainingConnectors.remove(connectorName)` will remove elements anyways, so line-106's assertEquals will always be true - so the assertion is pointless. Which I thought wasn't okayy. but your point is valid too! feel free to resolve this comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add entity type to topic names in ConsumerGroupDescribeResponse.json [kafka]
dajac commented on PR #14986: URL: https://github.com/apache/kafka/pull/14986#issuecomment-1852606155 Merged to trunk and 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add entity type to topic names in ConsumerGroupDescribeResponse.json [kafka]
dajac merged PR #14986: URL: https://github.com/apache/kafka/pull/14986 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424433876 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean swallowException) { } } +/** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ +private void prepareShutdown(final Timer timer, final AtomicReference firstException) { +if (!groupMetadata.isPresent()) +return; +maybeAutoCommitSync(timer, firstException); +timer.update(); +waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, timer.remainingMs()), timer, firstException); +maybeInvokeCommitCallbacks(); +maybeRevokePartitions(timer, firstException); +waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, timer.remainingMs()), timer, firstException); +} + +private void waitOnEventCompletion(final ConsumerCloseApplicationEvent event, + final Timer timer, + final AtomicReference firstException) { +try { +applicationEventHandler.addAndGet(event, timer); +} catch (TimeoutException e) { Review Comment: @kirktrue and I discussed the potential tasks for dealing with zero timeout. This needs to be examined perhaps after the preview. So we will spin off a jira ticket for this specific issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424414983 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -178,27 +171,11 @@ static void runAtClose(final Collection> requ final NetworkClientDelegate networkClientDelegate, final Timer timer) { // These are the optional outgoing requests at the -List pollResults = requestManagers.stream() +requestManagers.stream() Review Comment: This is not a conflict actually - this is just some changes to how fetch request manager closes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor closing consumer [kafka]
philipnee commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424413992 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1026,15 +1038,13 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - +// Prepare shutting down the network thread +prepareShutdown(closeTimer, firstException); +closeTimer.update(); if (applicationEventHandler != null) -closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); - -// Invoke all callbacks after the background thread exists in case if there are unsent async -// commits -maybeInvokeCommitCallbacks(); - -closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); +closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); +closeTimer.update(); +// Ensure all async commit callbacks are invoked Review Comment: might not even need this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]
MikeEdgar commented on PR #14599: URL: https://github.com/apache/kafka/pull/14599#issuecomment-1852564791 Thanks for the feedback @jolshan . If I'm following your comments on the broken test, the assertion ```scala assertThrows(classOf[ExecutionException], () => results.get(nonExistingTopicId).get).getCause.isInstanceOf[UnknownTopicIdException] ``` must be replaced with: ```scala assertFutureExceptionTypeEquals(results.get(nonExistingTopicId), classOf[UnknownTopicIdException]) ``` Correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptor [kafka]
vamossagar12 commented on PR #14963: URL: https://github.com/apache/kafka/pull/14963#issuecomment-1852550354 Thanks for the changes @Joker-5 . I will take a look this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]
jolshan commented on PR #14599: URL: https://github.com/apache/kafka/pull/14599#issuecomment-1852550819 I synced with @ijuma offline. I think it makes sense to return the UnknownTopicId exception since that is what we do for the deleteTopics api and what the server is using. It is a bit annoying that we can't use the topicError directly and we convert to the cluster object that loses all the detail about the topic IDs and their error responses. But fixing that requires a larger refactor, For now let's just fix https://github.com/apache/kafka/blob/2a5fbf28820ddcde5ead605e070391059d5d2e18/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala#L200 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]
vamossagar12 commented on code in PR #14966: URL: https://github.com/apache/kafka/pull/14966#discussion_r1424389303 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest { private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; -private static final String CONNECTOR_NAME = "test-connector"; -private static final String TOPIC = "test-topic"; private static final int NUM_TASKS = 2; private static final int NUM_RECORDS_PER_PARTITION = 10; -private Map workerProps; -private EmbeddedConnectCluster.Builder connectBuilder; +private static final Map, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); +@Rule +public TestName currentTest = new TestName(); private EmbeddedConnectCluster connect; +private String connectorName; +private String topic; @Before public void setup() { -Properties brokerProps = new Properties(); -brokerProps.put("transaction.state.log.replication.factor", "1"); -brokerProps.put("transaction.state.log.min.isr", "1"); - -// setup Connect worker properties -workerProps = new HashMap<>(); -workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); - -// build a Connect cluster backed by Kafka and Zk -connectBuilder = new EmbeddedConnectCluster.Builder() -.name("connect-cluster") -.numWorkers(NUM_WORKERS) -.brokerProps(brokerProps) -.workerProps(workerProps); +connectorName = currentTest.getMethodName(); +topic = currentTest.getMethodName(); +connect = defaultConnectCluster(); } @After public void tearDown() { -connect.stop(); +Set remainingConnectors = new HashSet<>(connect.connectors()); +if (remainingConnectors.remove(connectorName)) { +connect.deleteConnector(connectorName); +} +try { +assertEquals( +"Some connectors were not properly cleaned up after this test", +Collections.emptySet(), +remainingConnectors +); +} finally { +// Make a last-ditch effort to clean up the leaked connectors +// so as not to interfere with other test cases +remainingConnectors.forEach(connect::deleteConnector); +} +} + +@AfterClass +public static void close() { +// stop all Connect, Kafka and Zk threads. +CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); +} + +private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map workerProps) { +return CONNECT_CLUSTERS.computeIfAbsent(workerProps, props -> { +Properties brokerProps = new Properties(); +brokerProps.put("transaction.state.log.replication.factor", "1"); +brokerProps.put("transaction.state.log.min.isr", "1"); + +// Have to declare a new map since the passed-in one may be immutable +Map workerPropsWithDefaults = new HashMap<>(workerProps); +// Enable fast offset commits by default + workerPropsWithDefaults.putIfAbsent(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); + +EmbeddedConnectCluster result = new EmbeddedConnectCluster.Builder() Review Comment: Ok, I saw `connect` in `ConnectRestartAPI` being used, that's why thought we could use the same name. We can retain this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptor [kafka]
vamossagar12 commented on PR #14963: URL: https://github.com/apache/kafka/pull/14963#issuecomment-1852542077 hmm, the JDK21 build failed with this error => ``` > Task :examples:spotbugsMain Cannot contact jenkins-shared-ubuntu-3: java.lang.InterruptedException > Task :core:compileScala > Task :group-coordinator:classes > Task :metadata:classes > Task :examples:spotbugsTest SKIPPED > Task :examples:check > Task :group-coordinator:checkstyleMain > Task :clients:testClasses > Task :clients:checkstyleTest > Task :clients:spotbugsMain The message received from the daemon indicates that the daemon has disappeared. Build request sent: Build{id=f24da9a8-313c-42d5-bc18-9f578f44c4a3, currentDir=/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-14963} Attempting to read last messages from the daemon log... Daemon pid: 2004981 log file: /home/jenkins/.gradle/daemon/8.5/daemon-2004981.out.log - Last 20 lines from daemon log file - daemon-2004981.out.log - 2023-12-11T15:25:02.911+ [DEBUG] [org.gradle.launcher.daemon.server.SynchronizedDispatchConnection] thread 25: received class org.gradle.launcher.daemon.protocol.CloseInput 2023-12-11T15:25:02.911+ [DEBUG] [org.gradle.launcher.daemon.server.DefaultDaemonConnection] thread 25: Received IO message from client: org.gradle.launcher.daemon.protocol.CloseInput@41f596f2 2023-12-11T15:25:02.945+ [DEBUG] [org.gradle.launcher.daemon.server.exec.RequestStopIfSingleUsedDaemon] Requesting daemon stop after processing Build{id=f24da9a8-313c-42d5-bc18-9f578f44c4a3, currentDir=/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-14963} 2023-12-11T15:25:02.965+ [LIFECYCLE] [org.gradle.launcher.daemon.server.DaemonStateCoordinator] Daemon will be stopped at the end of the build 2023-12-11T15:25:02.968+ [DEBUG] [org.gradle.launcher.daemon.server.DaemonStateCoordinator] Stop as soon as idle requested. The daemon is busy: true 2023-12-11T15:25:02.969+ [DEBUG] [org.gradle.launcher.daemon.server.DaemonStateCoordinator] daemon stop has been requested. Sleeping until state changes. 2023-12-11T15:25:02.970+ [DEBUG] [org.gradle.launcher.daemon.server.exec.ExecuteBuild] The daemon has started executing the build. 2023-12-11T15:25:02.971+ [DEBUG] [org.gradle.launcher.daemon.server.exec.ExecuteBuild] Executing build with daemon context: DefaultDaemonContext[uid=c8e3ee8e-33ef-46b7-a5a1-3d91a0e54ca4,javaHome=/usr/local/asfpackages/java/adoptium-jdk-21.0.1+12,daemonRegistryDir=/home/jenkins/.gradle/daemon,pid=2004981,idleTimeout=12,priority=NORMAL,applyInstrumentationAgent=true,daemonOpts=-Xss4m,-XX:+UseParallelGC,--add-opens=java.base/java.util=ALL-UNNAMED,--add-opens=java.base/java.lang=ALL-UNNAMED,--add-opens=java.base/java.lang.invoke=ALL-UNNAMED,--add-opens=java.prefs/java.util.prefs=ALL-UNNAMED,--add-opens=java.base/java.nio.charset=ALL-UNNAMED,--add-opens=java.base/java.net=ALL-UNNAMED,--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED,-Xmx2g,-Dfile.encoding=UTF-8,-Duser.country,-Duser.language=en,-Duser.variant] 2023-12-11T15:25:02.972+ [INFO] [org.gradle.launcher.daemon.server.exec.ForwardClientInput] Closing daemon's stdin at end of input. 2023-12-11T15:25:02.973+ [INFO] [org.gradle.launcher.daemon.server.exec.ForwardClientInput] The daemon will no longer process any standard input. Starting build with version 3.7.0-SNAPSHOT (commit id 22d2b462) using Gradle 8.5, Java 21 and Scala 2.13.12 Build properties: maxParallelForks=4, maxScalacThreads=4, maxTestRetries=0 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
vamossagar12 commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424381464 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -719,20 +721,26 @@ public void run() { result.records() ); -timer.add(new TimerTask(timeout) { +timer.add(new TimerTask(timeout.toMillis()) { @Override public void run() { -scheduleInternalOperation(name, tp, () -> complete(new TimeoutException("Writing records to the log timed out"))); +if (!future.isDone()) { +scheduleInternalOperation( +"LogAppendEvent(name=" + name + ", tp=" + tp + ")", +tp, +() -> complete(new TimeoutException("Log append event " + name + "timed out for TopicPartition " + tp)) +); +} } }); - context.coordinator.updateLastWrittenOffset(offset); - // Add the response to the deferred queue. if (!future.isDone()) { context.deferredEventQueue.add(offset, this); + context.coordinator.updateLastWrittenOffset(offset); Review Comment: Ok. yes, that was a mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15883: Implement RemoteCopyLagBytes [kafka]
kamalcph commented on code in PR #14832: URL: https://github.com/apache/kafka/pull/14832#discussion_r1424357202 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -341,9 +341,10 @@ public void onLeadershipChange(Set partitionsBecomeLeader, leaderPartitions.forEach(this::cacheTopicPartitionIds); followerPartitions.forEach(this::cacheTopicPartitionIds); +followerPartitions.forEach( +topicIdPartition -> brokerTopicStats.topicStats(topicIdPartition.topic()).removeRemoteCopyBytesLag(topicIdPartition.partition())); - remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, followerPartitions); -followerPartitions.forEach(topicIdPartition -> + remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, followerPartitions);followerPartitions.forEach(topicIdPartition -> Review Comment: please revert this change. Two statements in the same line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424354426 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } +@Test +public void testScheduleWriteOpWhenWriteTimesout() throws InterruptedException { +MockTimer timer = new MockTimer(); +// The partition writer only accept on write. +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Loads the coordinator. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0, ctx.coordinator.lastWrittenOffset()); +assertEquals(0, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Write #1. We should get a TimeoutException because the HWM will not advance Review Comment: nit: `.` at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424353506 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } +@Test +public void testScheduleWriteOpWhenWriteTimesout() throws InterruptedException { Review Comment: nit: `TimesOut`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424352569 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -186,6 +187,7 @@ public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedE when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("consumer-group-heartbeat"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any(), Review Comment: Could we actually put the expected value for all of those? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424351506 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1092,6 +1134,7 @@ private CoordinatorRuntime( CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier, Time time, Timer timer, +Duration timeout, Review Comment: nit `defaultWriteTimeout`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350048 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture future; +/** + * Timeout value for the write operation + */ + +final Duration timeout; Review Comment: Should it be `defaultWriteTimeout` too? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -595,13 +611,15 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { /** * Constructor. * - * @param name The operation name. - * @param tpThe topic partition that the operation is applied to. - * @param opThe write operation. + * @param name The operation name. + * @param tpThe topic partition that the operation is applied to. + * @param timeout The write operation timeout + * @param opThe write operation. */ CoordinatorWriteEvent( String name, TopicPartition tp, +Duration timeout, Review Comment: Same question about the name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424351314 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1019,6 +1055,11 @@ public void onHighWatermarkUpdated( */ private final Timer timer; +/** + * The write operation timeout + */ +private final Duration timeout; Review Comment: nit: `defaultWriteTimeout`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350465 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -698,13 +720,27 @@ public void run() { producerEpoch, result.records() ); - context.coordinator.updateLastWrittenOffset(offset); + +timer.add(new TimerTask(timeout.toMillis()) { +@Override +public void run() { +if (!future.isDone()) { +scheduleInternalOperation( +"LogAppendEvent(name=" + name + ", tp=" + tp + ")", +tp, +() -> complete(new TimeoutException("Log append event " + name + "timed out for TopicPartition " + tp)) +); +} +} +}); // Add the response to the deferred queue. if (!future.isDone()) { context.deferredEventQueue.add(offset, this); + context.coordinator.updateLastWrittenOffset(offset); } else { complete(null); + Review Comment: nit: We could remove this empty line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350233 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -595,13 +611,15 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { /** * Constructor. * - * @param name The operation name. - * @param tpThe topic partition that the operation is applied to. - * @param opThe write operation. + * @param name The operation name. + * @param tpThe topic partition that the operation is applied to. + * @param timeout The write operation timeout + * @param opThe write operation. */ CoordinatorWriteEvent( String name, TopicPartition tp, +Duration timeout, Review Comment: Same question about the name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424349399 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -135,6 +138,11 @@ public Builder withTimer(Timer timer) { return this; } +public Builder withWriteTimeOut(Duration timeout) { Review Comment: nit: `withDefaultWriteTimeout`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424349887 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture future; +/** + * Timeout value for the write operation + */ + Review Comment: nit: Remove empty line. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture future; +/** + * Timeout value for the write operation + */ + +final Duration timeout; Review Comment: Should it be `defaultWriteTimeout` too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424345912 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -719,20 +721,26 @@ public void run() { result.records() ); -timer.add(new TimerTask(timeout) { +timer.add(new TimerTask(timeout.toMillis()) { @Override public void run() { -scheduleInternalOperation(name, tp, () -> complete(new TimeoutException("Writing records to the log timed out"))); +if (!future.isDone()) { +scheduleInternalOperation( +"LogAppendEvent(name=" + name + ", tp=" + tp + ")", +tp, +() -> complete(new TimeoutException("Log append event " + name + "timed out for TopicPartition " + tp)) +); +} } }); - context.coordinator.updateLastWrittenOffset(offset); - // Add the response to the deferred queue. if (!future.isDone()) { context.deferredEventQueue.add(offset, this); + context.coordinator.updateLastWrittenOffset(offset); Review Comment: Why did you move this? I think that it should stay where it was and the timer must be added here, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
[ https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-9545. Resolution: Fixed > Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted` > -- > > Key: KAFKA-9545 > URL: https://issues.apache.org/jira/browse/KAFKA-9545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jason Gustafson >Assignee: Ashwin Pankaj >Priority: Major > > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/ > {code} > java.lang.AssertionError: Condition not met within timeout 15000. Stream > tasks not updated > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15997) Ensure fairness in the uniform assignor
[ https://issues.apache.org/jira/browse/KAFKA-15997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-15997: --- Assignee: Ritika Reddy > Ensure fairness in the uniform assignor > --- > > Key: KAFKA-15997 > URL: https://issues.apache.org/jira/browse/KAFKA-15997 > Project: Kafka > Issue Type: Sub-task >Reporter: Emanuele Sabellico >Assignee: Ritika Reddy >Priority: Minor > > > > Fairness has to be ensured in uniform assignor as it was in > cooperative-sticky one. > There's this test 0113 subtest u_multiple_subscription_changes in librdkafka > where 8 consumers are subscribing to the same topic, and it's verifying that > all of them are getting 2 partitions assigned. But with new protocol it seems > two consumers get assigned 3 partitions and 1 has zero partitions. The test > doesn't configure any client.rack. > {code:java} > [0113_cooperative_rebalance /478.183s] Consumer assignments > (subscription_variation 0) (stabilized) (no rebalance cb): > [0113_cooperative_rebalance /478.183s] Consumer C_0#consumer-3 assignment > (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [5] (2000msgs), > rdkafkatest_rnd24419cc75e59d8de_0113u_1 [8] (4000msgs) > [0113_cooperative_rebalance /478.183s] Consumer C_1#consumer-4 assignment > (3): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [0] (1000msgs), > rdkafkatest_rnd24419cc75e59d8de_0113u_1 [3] (2000msgs), > rdkafkatest_rnd24419cc75e59d8de_0113u_1 [13] (1000msgs) > [0113_cooperative_rebalance /478.184s] Consumer C_2#consumer-5 assignment > (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [6] (1000msgs), > rdkafkatest_rnd24419cc75e59d8de_0113u_1 [10] (2000msgs) > [0113_cooperative_rebalance /478.184s] Consumer C_3#consumer-6 assignment > (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [7] (1000msgs), > rdkafkatest_rnd24419cc75e59d8de_0113u_1 [9] (2000msgs) > [0113_cooperative_rebalance /478.184s] Consumer C_4#consumer-7 assignment > (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [11] (1000msgs), > rdkafkatest_rnd24419cc75e59d8de_0113u_1 [14] (3000msgs) > [0113_cooperative_rebalance /478.184s] Consumer C_5#consumer-8 assignment > (3): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [1] (2000msgs), > rdkafkatest_rnd24419cc75e59d8de_0113u_1 [2] (2000msgs), > rdkafkatest_rnd24419cc75e59d8de_0113u_1 [4] (1000msgs) > [0113_cooperative_rebalance /478.184s] Consumer C_6#consumer-9 assignment > (0): > [0113_cooperative_rebalance /478.184s] Consumer C_7#consumer-10 assignment > (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [12] (1000msgs), > rdkafkatest_rnd24419cc75e59d8de_0113u_1 [15] (1000msgs) > [0113_cooperative_rebalance /478.184s] 16/32 partitions assigned > [0113_cooperative_rebalance /478.184s] Consumer C_0#consumer-3 has 2 > assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions > [0113_cooperative_rebalance /478.184s] Consumer C_1#consumer-4 has 3 > assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions > [0113_cooperative_rebalance /478.184s] Consumer C_2#consumer-5 has 2 > assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions > [0113_cooperative_rebalance /478.184s] Consumer C_3#consumer-6 has 2 > assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions > [0113_cooperative_rebalance /478.184s] Consumer C_4#consumer-7 has 2 > assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions > [0113_cooperative_rebalance /478.184s] Consumer C_5#consumer-8 has 3 > assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions > [0113_cooperative_rebalance /478.184s] Consumer C_6#consumer-9 has 0 > assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions > [0113_cooperative_rebalance /478.184s] Consumer C_7#consumer-10 has 2 > assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions > [ /479.057s] 1 test(s) running: > 0113_cooperative_rebalance > [ /480.057s] 1 test(s) running: > 0113_cooperative_rebalance > [ /481.057s] 1 test(s) running: > 0113_cooperative_rebalance > [0113_cooperative_rebalance /482.498s] TEST FAILURE > ### Test "0113_cooperative_rebalance (u_multiple_subscription_changes:2390: > use_rebalance_cb: 0, subscription_variation: 0)" failed at > test.c:1243:check_test_timeouts() at Thu Dec 7 15:52:15 2023: ### > Test 0113_cooperative_rebalance (u_multiple_subscription_changes:2390: > use_rebalance_cb: 0, subscription_variation: 0) timed out (timeout set to 480 > seconds) > ./run-test.sh: line 62: 3512920 Killed $TEST $ARGS > ### > ### Test ./test-runner in bare mode FAILED! (return code 137) ### > ###{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15997) Ensure fairness in the uniform assignor
Emanuele Sabellico created KAFKA-15997: -- Summary: Ensure fairness in the uniform assignor Key: KAFKA-15997 URL: https://issues.apache.org/jira/browse/KAFKA-15997 Project: Kafka Issue Type: Sub-task Reporter: Emanuele Sabellico Fairness has to be ensured in uniform assignor as it was in cooperative-sticky one. There's this test 0113 subtest u_multiple_subscription_changes in librdkafka where 8 consumers are subscribing to the same topic, and it's verifying that all of them are getting 2 partitions assigned. But with new protocol it seems two consumers get assigned 3 partitions and 1 has zero partitions. The test doesn't configure any client.rack. {code:java} [0113_cooperative_rebalance /478.183s] Consumer assignments (subscription_variation 0) (stabilized) (no rebalance cb): [0113_cooperative_rebalance /478.183s] Consumer C_0#consumer-3 assignment (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [5] (2000msgs), rdkafkatest_rnd24419cc75e59d8de_0113u_1 [8] (4000msgs) [0113_cooperative_rebalance /478.183s] Consumer C_1#consumer-4 assignment (3): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [0] (1000msgs), rdkafkatest_rnd24419cc75e59d8de_0113u_1 [3] (2000msgs), rdkafkatest_rnd24419cc75e59d8de_0113u_1 [13] (1000msgs) [0113_cooperative_rebalance /478.184s] Consumer C_2#consumer-5 assignment (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [6] (1000msgs), rdkafkatest_rnd24419cc75e59d8de_0113u_1 [10] (2000msgs) [0113_cooperative_rebalance /478.184s] Consumer C_3#consumer-6 assignment (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [7] (1000msgs), rdkafkatest_rnd24419cc75e59d8de_0113u_1 [9] (2000msgs) [0113_cooperative_rebalance /478.184s] Consumer C_4#consumer-7 assignment (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [11] (1000msgs), rdkafkatest_rnd24419cc75e59d8de_0113u_1 [14] (3000msgs) [0113_cooperative_rebalance /478.184s] Consumer C_5#consumer-8 assignment (3): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [1] (2000msgs), rdkafkatest_rnd24419cc75e59d8de_0113u_1 [2] (2000msgs), rdkafkatest_rnd24419cc75e59d8de_0113u_1 [4] (1000msgs) [0113_cooperative_rebalance /478.184s] Consumer C_6#consumer-9 assignment (0): [0113_cooperative_rebalance /478.184s] Consumer C_7#consumer-10 assignment (2): rdkafkatest_rnd24419cc75e59d8de_0113u_1 [12] (1000msgs), rdkafkatest_rnd24419cc75e59d8de_0113u_1 [15] (1000msgs) [0113_cooperative_rebalance /478.184s] 16/32 partitions assigned [0113_cooperative_rebalance /478.184s] Consumer C_0#consumer-3 has 2 assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions [0113_cooperative_rebalance /478.184s] Consumer C_1#consumer-4 has 3 assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions [0113_cooperative_rebalance /478.184s] Consumer C_2#consumer-5 has 2 assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions [0113_cooperative_rebalance /478.184s] Consumer C_3#consumer-6 has 2 assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions [0113_cooperative_rebalance /478.184s] Consumer C_4#consumer-7 has 2 assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions [0113_cooperative_rebalance /478.184s] Consumer C_5#consumer-8 has 3 assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions [0113_cooperative_rebalance /478.184s] Consumer C_6#consumer-9 has 0 assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions [0113_cooperative_rebalance /478.184s] Consumer C_7#consumer-10 has 2 assigned partitions (1 subscribed topic(s)), expecting 2 assigned partitions [ /479.057s] 1 test(s) running: 0113_cooperative_rebalance [ /480.057s] 1 test(s) running: 0113_cooperative_rebalance [ /481.057s] 1 test(s) running: 0113_cooperative_rebalance [0113_cooperative_rebalance /482.498s] TEST FAILURE ### Test "0113_cooperative_rebalance (u_multiple_subscription_changes:2390: use_rebalance_cb: 0, subscription_variation: 0)" failed at test.c:1243:check_test_timeouts() at Thu Dec 7 15:52:15 2023: ### Test 0113_cooperative_rebalance (u_multiple_subscription_changes:2390: use_rebalance_cb: 0, subscription_variation: 0) timed out (timeout set to 480 seconds) ./run-test.sh: line 62: 3512920 Killed $TEST $ARGS ### ### Test ./test-runner in bare mode FAILED! (return code 137) ### ###{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
lianetm commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1424320386 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -169,7 +169,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { startingTimestamp = startingTimestamp) Review Comment: FYI, the max poll interval PR has been merged, so I guess we can enable now the existing callback integration tests that depended on that one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15870: Move new group coordinator metrics from Yammer to Metrics [kafka]
jeffkbkim commented on code in PR #14848: URL: https://github.com/apache/kafka/pull/14848#discussion_r1424319049 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ## @@ -77,29 +70,13 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC private final MetricsRegistry registry; private final Metrics metrics; -private final Map shards = new HashMap<>(); -private static final AtomicLong NUM_GENERIC_GROUPS_PREPARING_REBALANCE_COUNTER = new AtomicLong(0); -private static final AtomicLong NUM_GENERIC_GROUPS_COMPLETING_REBALANCE_COUNTER = new AtomicLong(0); -private static final AtomicLong NUM_GENERIC_GROUPS_STABLE_COUNTER = new AtomicLong(0); -private static final AtomicLong NUM_GENERIC_GROUPS_DEAD_COUNTER = new AtomicLong(0); -private static final AtomicLong NUM_GENERIC_GROUPS_EMPTY_COUNTER = new AtomicLong(0); +private final Map shards = new ConcurrentHashMap<>(); Review Comment: we may have concurrent modification when the metric thread scrapes a shard while the shard is removed from `shards` on deactivate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax commented on PR #14936: URL: https://github.com/apache/kafka/pull/14936#issuecomment-1852434603 @stanislavkozlovski There is no 3.7 branch yet, so I merged this one. Must go into 3.7 release (is ready for days, but Jenkins did not cooperate...) -- If your cut does not include it, I'll cherry-pick to 3.7 later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax merged PR #14936: URL: https://github.com/apache/kafka/pull/14936 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1424297697 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -790,6 +812,197 @@ public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { verify(membershipManager, never()).transitionToJoining(); } +@Test +public void testListenerCallbacksBasic() { Review Comment: I've updated the `MembershipManagerImplTest` to include these latest changes. Thanks for clarifying the desired behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15366: Modify LogDirFailureTest for KRaft [kafka]
soarez commented on code in PR #14977: URL: https://github.com/apache/kafka/pull/14977#discussion_r1424296842 ## core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala: ## @@ -191,12 +204,27 @@ class LogDirFailureTest extends IntegrationTestHarness { TestUtils.pollUntilAtLeastNumRecords(consumer, 1) // There should be no remaining LogDirEventNotification znode -assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty) +if (quorum == "zk") { + assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty) +} -// The controller should have marked the replica on the original leader as offline -val controllerServer = servers.find(_.kafkaController.isActive).get -val offlineReplicas = controllerServer.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) -assertTrue(offlineReplicas.contains(PartitionAndReplica(new TopicPartition(topic, 0), leaderServerId))) +if (quorum == "kraft") { + waitUntilTrue(() => { +brokers.exists(broker => { + val hasOfflineDir = broker.asInstanceOf[BrokerServer].logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString) + hasOfflineDir && broker.asInstanceOf[BrokerServer] +.replicaManager +.metadataCache +.getClusterMetadata(broker.clusterId, broker.config.interBrokerListenerName) +.partition(new TopicPartition(topic, 0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId) Review Comment: Hi @viktorsomogyi `ReplicaManager.maybeUpdateTopicAssignment` sends `AssignReplicasToDirs` as necessary after a partition is created. PR #14982 fixed an integration test — `kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs(String).quorum=kraft` — which relies on `AssignReplicasToDirs` being sent. So I'd expect that part to be working. It could also be be that the log directory failure somehow isn't being propagated (via BrokerHeartbeatRequest), or that the metadata cache in the broker isn't being updated. To troubleshoot, I suggest verifying the following via logging/breakpoints: 1. Check that the assignment is being sent in `AssignmentsManager.onAssignment` 2. Check that the controller is receiving the assignment in `ReplicationControlManager.handleAssignReplicasToDirs` 3. Check that the directory failure is being propagated by the broker in `BrokerLifecycleManager.propagateDirectoryFailure` 4. Check that the controller is processing the directory failure in `ReplicationControlManager.handleDirectoriesOffline` 5. If all the above is happening, then there must be some issue in the up catching with metadata by the broker. ## core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala: ## @@ -191,12 +204,27 @@ class LogDirFailureTest extends IntegrationTestHarness { TestUtils.pollUntilAtLeastNumRecords(consumer, 1) // There should be no remaining LogDirEventNotification znode -assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty) +if (quorum == "zk") { + assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty) +} -// The controller should have marked the replica on the original leader as offline -val controllerServer = servers.find(_.kafkaController.isActive).get -val offlineReplicas = controllerServer.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) -assertTrue(offlineReplicas.contains(PartitionAndReplica(new TopicPartition(topic, 0), leaderServerId))) +if (quorum == "kraft") { + waitUntilTrue(() => { +brokers.exists(broker => { + val hasOfflineDir = broker.asInstanceOf[BrokerServer].logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString) + hasOfflineDir && broker.asInstanceOf[BrokerServer] +.replicaManager +.metadataCache +.getClusterMetadata(broker.clusterId, broker.config.interBrokerListenerName) +.partition(new TopicPartition(topic, 0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId) Review Comment: Hi @viktorsomogyi `ReplicaManager.maybeUpdateTopicAssignment` sends `AssignReplicasToDirs` as necessary after a partition is created. PR #14982 fixed an integration test — `kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs(String).quorum=kraft` — which relies on `AssignReplicasToDirs` being sent. So I'd expect that part to be working. It could also be be that the log directory failure somehow isn't being propagated (via BrokerHeartbeatRequest), or that the metadata cache in the broker isn't being updated. To troubleshoot, I suggest verifying the following via logging/breakpoints: 1. Check that the assignment is being sent in `AssignmentsManager.onAssignment` 2. Check that the controller is receiving the assignment in `ReplicationControlManager.handleAssignReplicasToDirs`
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
dajac commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1424300228 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -860,6 +871,78 @@ public void testGroupMetadataUpdateSingleCall() { } } +/** + * Tests that the consumer correctly invokes the callbacks for {@link ConsumerRebalanceListener} that was + * specified. We don't go through the full effort to emulate heartbeats and correct group management here. We're + * simply exercising the background {@link EventProcessor} does the correct thing when + * {@link AsyncKafkaConsumer#poll(Duration)} is called. + * + * Note that we test {@link ConsumerRebalanceListener} that throws errors in its different callbacks. Failed + * callback execution does not immediately errors. Instead, those errors are forwarded to the + * application event thread for the {@link MembershipManagerImpl} to handle. + */ +@ParameterizedTest +@MethodSource("listenerCallbacksInvokeSource") +public void testListenerCallbacksInvoke(List methodNames, +Optional revokedError, +Optional assignedError, +Optional lostError, +int expectedRevokedCount, +int expectedAssignedCount, +int expectedLostCount) { +CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener( +revokedError, +assignedError, +lostError +); +consumer.subscribe(Collections.singletonList("topic"), consumerRebalanceListener); +SortedSet partitions = Collections.emptySortedSet(); + +for (ConsumerRebalanceListenerMethodName methodName : methodNames) { +CompletableBackgroundEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, partitions); +backgroundEventQueue.add(e); + +// This will trigger the background event queue to process our background event message. +consumer.poll(Duration.ZERO); +} + +assertEquals(expectedRevokedCount, consumerRebalanceListener.revokedCount()); +assertEquals(expectedAssignedCount, consumerRebalanceListener.assignedCount()); +assertEquals(expectedLostCount, consumerRebalanceListener.lostCount()); Review Comment: I see. Ideally, we should mock the background part of it here. If this is too hard. We can address it separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1424285645 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -860,6 +871,78 @@ public void testGroupMetadataUpdateSingleCall() { } } +/** + * Tests that the consumer correctly invokes the callbacks for {@link ConsumerRebalanceListener} that was + * specified. We don't go through the full effort to emulate heartbeats and correct group management here. We're + * simply exercising the background {@link EventProcessor} does the correct thing when + * {@link AsyncKafkaConsumer#poll(Duration)} is called. + * + * Note that we test {@link ConsumerRebalanceListener} that throws errors in its different callbacks. Failed + * callback execution does not immediately errors. Instead, those errors are forwarded to the + * application event thread for the {@link MembershipManagerImpl} to handle. + */ +@ParameterizedTest +@MethodSource("listenerCallbacksInvokeSource") +public void testListenerCallbacksInvoke(List methodNames, +Optional revokedError, +Optional assignedError, +Optional lostError, +int expectedRevokedCount, +int expectedAssignedCount, +int expectedLostCount) { +CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener( +revokedError, +assignedError, +lostError +); +consumer.subscribe(Collections.singletonList("topic"), consumerRebalanceListener); +SortedSet partitions = Collections.emptySortedSet(); + +for (ConsumerRebalanceListenerMethodName methodName : methodNames) { +CompletableBackgroundEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, partitions); +backgroundEventQueue.add(e); + +// This will trigger the background event queue to process our background event message. +consumer.poll(Duration.ZERO); +} + +assertEquals(expectedRevokedCount, consumerRebalanceListener.revokedCount()); +assertEquals(expectedAssignedCount, consumerRebalanceListener.assignedCount()); +assertEquals(expectedLostCount, consumerRebalanceListener.lostCount()); Review Comment: I attempted this, but due to the way the way the code and tests are structured, it's very awkward to do so. Basically, when we create an `AsyncKafkaConsumer`, we start the background network I/O thread. That background thread pulls the events off of the queue, so there's a timing issue which could make the test flaky. I'll make another attempt at this today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]
kirktrue commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1424283196 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -790,6 +791,297 @@ public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { verify(membershipManager, never()).transitionToJoining(); } +@Test +public void testListenerCallbacksBasic() { +// Step 1: set up mocks +MembershipManagerImpl membershipManager = createMemberInStableState(); +CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener(); +ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker(); + +String topicName = "topic1"; +Uuid topicId = Uuid.randomUuid(); + + when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet()); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener)); +doNothing().when(subscriptionState).markPendingRevocation(anySet()); + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); + +// Step 2: put the state machine into the appropriate... state + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); +receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager); +assertEquals(MemberState.RECONCILING, membershipManager.state()); +assertEquals(Collections.emptySet(), membershipManager.currentAssignment()); +assertTrue(membershipManager.reconciliationInProgress()); +assertEquals(0, listener.revokedCounter.get()); +assertEquals(0, listener.assignedCounter.get()); +assertEquals(0, listener.lostCounter.get()); + +assertTrue(membershipManager.reconciliationInProgress()); + +// Step 3: assign partitions +performCallback( +membershipManager, +invoker, +ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, +topicPartitions(topicName, 0, 1) +); + +assertFalse(membershipManager.reconciliationInProgress()); + +// Step 4: Send ack and make sure we're done and our listener was called appropriately +membershipManager.onHeartbeatRequestSent(); +assertEquals(MemberState.STABLE, membershipManager.state()); +assertEquals(topicIdPartitions(topicId, topicName, 0, 1), membershipManager.currentAssignment()); + +assertEquals(0, listener.revokedCounter.get()); +assertEquals(1, listener.assignedCounter.get()); +assertEquals(0, listener.lostCounter.get()); + +// Step 5: receive an empty assignment, which means we should call revoke + when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName, 0, 1)); +receiveEmptyAssignment(membershipManager); +assertEquals(MemberState.RECONCILING, membershipManager.state()); +assertTrue(membershipManager.reconciliationInProgress()); + +// Step 6: revoke partitions +performCallback( +membershipManager, +invoker, +ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, +topicPartitions(topicName, 0, 1) +); +assertTrue(membershipManager.reconciliationInProgress()); + +// Step 7: assign partitions should still be called, even though it's empty +performCallback( +membershipManager, +invoker, +ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, +Collections.emptySortedSet() +); +assertFalse(membershipManager.reconciliationInProgress()); + +// Step 8: Send ack and make sure we're done and our listener was called appropriately +membershipManager.onHeartbeatRequestSent(); +assertEquals(MemberState.STABLE, membershipManager.state()); +assertFalse(membershipManager.reconciliationInProgress()); + +assertEquals(1, listener.revokedCounter.get()); +assertEquals(2, listener.assignedCounter.get()); +assertEquals(0, listener.lostCounter.get()); +} + +// TODO: Reconciliation needs to support when a listener throws an error on onPartitionsRevoked(). When that +// happens, the assignment step is skipped, which means onPartitionsAssigned() is never run. +// The jury is out on whether or not this is a bug or intentional. +// +// See https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more details. +// @Test +public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() { +// Step 1: set up mocks +MembershipManagerImpl membershipManager =
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424277821 ## streams/src/test/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValueTest.java: ## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue; +import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; + +@RunWith(Enclosed.class) +public class ReadonlyPartiallyDeserializedSegmentValueTest { + +/** + * Non-exceptional scenarios which are expected to occur during regular store operation. + */ +@RunWith(Parameterized.class) +public static class ExpectedCasesTest { + +private static final List TEST_CASES = new ArrayList<>(); + +static { +// test cases are expected to have timestamps in strictly decreasing order (except for the degenerate case) +TEST_CASES.add(new TestCase("single record", 10, new TestRecord("foo".getBytes(), 1))); +TEST_CASES.add(new TestCase("multiple records", 10, new TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new TestRecord("baz".getBytes(), 0))); +TEST_CASES.add(new TestCase("single tombstone", 10, new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("multiple tombstone", 10, new TestRecord(null, 4), new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new TestRecord("bar".getBytes(), 1))); +TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), new TestRecord(null, 2), new TestRecord(null, 1))); +TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 10, new TestRecord(null, 7), new TestRecord(null, 6), new TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1))); +TEST_CASES.add(new TestCase("record with empty bytes", 10, new TestRecord(new byte[0], 1))); +TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1))); +TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new TestRecord("foo".getBytes(), 1))); +} + +private final TestCase testCase; + +public ExpectedCasesTest(final TestCase testCase) { +this.testCase = testCase; +} + +@Parameterized.Parameters(name = "{0}") +public static Collection data() { +return TEST_CASES; +} + +@Test +public void shouldFindInTimeRangesWithDifferentOrders() { + +// create a list of timestamps in ascending order to use them in combination for starting and ending point of the time range. +final List timestamps = createTimestampsFromTestRecords(testCase); + +// verify results +final List orders = Arrays.asList(ResultOrder.ASCENDING, ResultOrder.ANY, ResultOrder.DESCENDING); +for (final ResultOrder order: orders) { +for (final Long from : timestamps) { +for
[PR] MINOR: Few cleanups to JaasContext/Utils classes [kafka]
q-ryanamiri opened a new pull request, #14994: URL: https://github.com/apache/kafka/pull/14994 Reviewers: Rajini Sivaram *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424265799 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -69,23 +73,49 @@ public boolean hasNext() { if (!open) { throw new IllegalStateException("The iterator is out of scope."); } -// since data is stored in descending order in the segments, check whether there is any previous record, if the order is Ascending. -final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); -return hasStillLoad || maybeFillIterator(); +if (this.next != null) { +return true; +} + +while ((currentDeserializedSegmentValue != null || currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == null) { +boolean hasSegmentValue = currentDeserializedSegmentValue != null || currentRawSegmentValue != null; +if (!hasSegmentValue) { +hasSegmentValue = maybeFillCurrentSegmentValue(); +} +if (hasSegmentValue) { +this.next = getNextRecord(); +if (this.next == null) { +prepareToFetchNextSegment(); +} +} +} +return this.next != null; } @Override -public Object next() { -if (hasNext()) { -// since data is stored in descending order in the segments, retrieve previous record, if the order is Ascending. -return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : iterator.next(); +public VersionedRecord next() { +if (this.next == null) { +if (!hasNext()) { +throw new NoSuchElementException(); +} } -throw new NoSuchElementException(); +assert this.next != null; Review Comment: > We usually don't use `assert`. Can be removed. Otherwise, there will be a warning, and we have to suppress it. But I will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424265799 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ## @@ -69,23 +73,49 @@ public boolean hasNext() { if (!open) { throw new IllegalStateException("The iterator is out of scope."); } -// since data is stored in descending order in the segments, check whether there is any previous record, if the order is Ascending. -final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext(); -return hasStillLoad || maybeFillIterator(); +if (this.next != null) { +return true; +} + +while ((currentDeserializedSegmentValue != null || currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == null) { +boolean hasSegmentValue = currentDeserializedSegmentValue != null || currentRawSegmentValue != null; +if (!hasSegmentValue) { +hasSegmentValue = maybeFillCurrentSegmentValue(); +} +if (hasSegmentValue) { +this.next = getNextRecord(); +if (this.next == null) { +prepareToFetchNextSegment(); +} +} +} +return this.next != null; } @Override -public Object next() { -if (hasNext()) { -// since data is stored in descending order in the segments, retrieve previous record, if the order is Ascending. -return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : iterator.next(); +public VersionedRecord next() { +if (this.next == null) { +if (!hasNext()) { +throw new NoSuchElementException(); +} } -throw new NoSuchElementException(); +assert this.next != null; Review Comment: > We usually don't use `assert`. Can be removed. Otherwise, there will be a warning, and we have to suppress it. But I will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Allow Block.resetBlockLatch to release blocked operation for end-of-test cleanup [kafka]
gharris1727 commented on PR #14987: URL: https://github.com/apache/kafka/pull/14987#issuecomment-1852376741 Closing in favor of #12290 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Allow Block.resetBlockLatch to release blocked operation for end-of-test cleanup [kafka]
gharris1727 closed pull request #14987: MINOR: Allow Block.resetBlockLatch to release blocked operation for end-of-test cleanup URL: https://github.com/apache/kafka/pull/14987 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424245975 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long minTimestamp() { +return minTimestamp; +} + +public long nextTimestamp() { +return nextTimestamp; +} + +public byte[] serialize() { +return segmentValue; +} + + +public RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult find( +final long fromTime, final long toTime, final ResultOrder order, final int index) { + +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} + +final boolean isAscending = order.equals(ResultOrder.ASCENDING); + +if (isAscending && valuesStartingIndex == -1) { +findValuesStartingIndex(); +deserIndex = recordNumber; +} + +long currTimestamp = -1; +long currNextTimestamp = -1; +int currIndex = initializeCurrentIndex(index, isAscending); +int cumValueSize = initializeCumValueSize(index, currIndex, isAscending); +int currValueSize; + + +while (hasStillRecord(currTimestamp, currNextTimestamp, order)) { +if (hasBeenDeserialized(isAscending, currIndex)) { +final TimestampAndValueSize curr; +curr = unpackedTimestampAndValueSizes.get(currIndex); +currTimestamp = curr.timestamp; +cumValueSize = cumulativeValueSizes.get(currIndex); +currValueSize = curr.valueSize; + +// update currValueSize +if (currValueSize == Integer.MIN_VALUE) { +final int timestampSegmentIndex = getTimestampIndex(order, currIndex); +currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE); +unpackedTimestampAndValueSizes.put(currIndex, new TimestampAndValueSize(currTimestamp, cumValueSize)); Review Comment: > `cumValueSize` is not updated with `currValueSize`. Why? Yes, in fact in this if (if deserIndex is ahead of currIndex), we must have EVERYTHING ready. Therefore, `if (currValueSize == Integer.MIN_VALUE)` does not fit here. I removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]
satishd merged PR #14649: URL: https://github.com/apache/kafka/pull/14649 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1424136033 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java: ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.query.ResultOrder; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +final class ReadonlyPartiallyDeserializedSegmentValue { + +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; +private byte[] segmentValue; +private long nextTimestamp; +private long minTimestamp; + +private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive) + +private Map cumulativeValueSizes; + +private int valuesStartingIndex = -1; // the index of the first value in the segment (but the last one in the list) +private Map unpackedTimestampAndValueSizes = new HashMap<>(); +private int recordNumber = -1; // number of segment records + + +ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) { +this.segmentValue = segmentValue; +this.nextTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); +this.minTimestamp = + RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); +resetDeserHelpers(); +} + + +public long minTimestamp() { +return minTimestamp; +} + +public long nextTimestamp() { +return nextTimestamp; +} + +public byte[] serialize() { +return segmentValue; +} + + +public RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult find( +final long fromTime, final long toTime, final ResultOrder order, final int index) { + +// this segment does not have any record in query specified time range +if (toTime < minTimestamp || fromTime > nextTimestamp) { +return null; +} + +final boolean isAscending = order.equals(ResultOrder.ASCENDING); + +if (isAscending && valuesStartingIndex == -1) { +findValuesStartingIndex(); +deserIndex = recordNumber; +} + +long currTimestamp = -1; +long currNextTimestamp = -1; +int currIndex = initializeCurrentIndex(index, isAscending); +int cumValueSize = initializeCumValueSize(index, currIndex, isAscending); Review Comment: > Not sure why we need to init `cumValueSize`? It seem below we never read the value set here? We must do that. Look at line#106 (`cumValueSize += Math.max(currValueSize, 0);`). In fact, when we have everything ready in the cache (`unpackedTimestampAndValueSizes`), we do not need it. Otherwise, we need the former computed cumValueSize to be able to move to the correct position in the segment and deserialize the next one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite [kafka]
yashmayya commented on PR #14966: URL: https://github.com/apache/kafka/pull/14966#issuecomment-1852343242 Thanks Chris, this looks like a really nice improvement! I can review sometime later this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org