[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1136454999 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -437,29 +456,24 @@ public void wakeup() { */ @Override public void commitSync(final Duration timeout) { -final CommitApplicationEvent commitEvent = new CommitApplicationEvent(subscriptions.allConsumed()); -eventHandler.add(commitEvent); - -final CompletableFuture commitFuture = commitEvent.future(); -try { -commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); -} catch (final TimeoutException e) { -throw new org.apache.kafka.common.errors.TimeoutException( - "timeout"); -} catch (final Exception e) { -// handle exception here -throw new RuntimeException(e); -} +commitSync(subscriptions.allConsumed(), timeout); } @Override public void commitSync(Map offsets) { -throw new KafkaException("method not implemented"); +commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs)); } @Override public void commitSync(Map offsets, Duration timeout) { -throw new KafkaException("method not implemented"); +CompletableFuture commitFuture = commit(offsets); +try { +commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); +} catch (final TimeoutException e) { Review Comment: Ditto above: InterruptedException -> kafka interruptException. TimeoutException -> kafka timeoutException. ExecutionException -> kafkaException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1136454609 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) { this.timer.update(currentTimeMs); } } + +private class FetchCommittedOffsetResponseHandler { Review Comment: It's 80% copy paste, a thing changed here: `retry(currentTimeMs);` is invoked upon RetriableExceptions. In the original code, the exception was raised and then retried until timeout. Here is the reference: https://github.com/apache/kafka/blob/d23ce20bdfbe5a9598523961cb7cf747ce4f52ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1016 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1136449438 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -306,13 +310,28 @@ public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) { } @Override -public Map committed(Set partitions) { -throw new KafkaException("method not implemented"); +public Map committed(final Set partitions) { +return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs)); } @Override -public Map committed(Set partitions, Duration timeout) { -throw new KafkaException("method not implemented"); +public Map committed(final Set partitions, +final Duration timeout) { +maybeThrowInvalidGroupIdException(); +final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions); +eventHandler.add(event); +try { +return event.complete(Duration.ofMillis(100)); +} catch (ExecutionException | InterruptedException | TimeoutException e) { +throw new KafkaException(e); Review Comment: InterruptedException, TimeoutException -> We can map them to the existing kafka exception ExecutionException - KafkaException There are a few blocking operations like committed(), I think we need to support wakeup here. I don't know what's the suggestions here, but I'm thinking of running a timer, and check wakeup in that loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14206) Upgrade zookeeper to 3.7.1 to address security vulnerabilities
[ https://issues.apache.org/jira/browse/KAFKA-14206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-14206: - Assignee: (was: Luke Chen) > Upgrade zookeeper to 3.7.1 to address security vulnerabilities > -- > > Key: KAFKA-14206 > URL: https://issues.apache.org/jira/browse/KAFKA-14206 > Project: Kafka > Issue Type: Improvement > Components: packaging >Affects Versions: 3.2.1 >Reporter: Valeriy Kassenbayev >Priority: Blocker > > Kafka 3.2.1 is using ZooKeeper, which is affected by > [CVE-2021-37136|https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584064] and > [CVE-2021-37137:|https://www.cve.org/CVERecord?id=CVE-2021-37137] > {code:java} > ✗ Denial of Service (DoS) [High > Severity][https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584063] in > io.netty:netty-codec@4.1.63.Final > introduced by org.apache.kafka:kafka_2.13@3.2.1 > > org.apache.zookeeper:zookeeper@3.6.3 > io.netty:netty-handler@4.1.63.Final > > io.netty:netty-codec@4.1.63.Final > This issue was fixed in versions: 4.1.68.Final > ✗ Denial of Service (DoS) [High > Severity][https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584064] in > io.netty:netty-codec@4.1.63.Final > introduced by org.apache.kafka:kafka_2.13@3.2.1 > > org.apache.zookeeper:zookeeper@3.6.3 > io.netty:netty-handler@4.1.63.Final > > io.netty:netty-codec@4.1.63.Final > This issue was fixed in versions: 4.1.68.Final {code} > The issues were fixed in the next versions of ZooKeeper (starting from > 3.6.4). ZooKeeper 3.7.1 is the next stable > [release|https://zookeeper.apache.org/releases.html] at the moment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1136434874 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } +static class UnsentOffsetFetchRequestState extends RequestState { +public final Set requestedPartitions; +public final GroupState.Generation requestedGeneration; +public CompletableFuture> future; + +public UnsentOffsetFetchRequestState(final Set partitions, + final GroupState.Generation generation, + final CompletableFuture> future, + final long retryBackoffMs) { +super(retryBackoffMs); +this.requestedPartitions = partitions; +this.requestedGeneration = generation; +this.future = future; +} + +public boolean sameRequest(final UnsentOffsetFetchRequestState request) { +return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); +} +} + +/** + * This is used to support the committed() API. Here we use a Java Collections, {@code unsentRequests}, to + * track + * the OffsetFetchRequests that haven't been sent, to prevent sending the same requests in the same batch. + * + * If the request is new. It will be enqueued to the {@code unsentRequest}, and will be sent upon the next + * poll. + * + * If the same request has been sent, the request's {@code CompletableFuture} will be completed upon the + * completion of the existing one. + * + * TODO: There is an optimization to present duplication to the sent but incompleted requests. I'm not sure if we + * need that. + */ +class UnsentOffsetFetchRequests { Review Comment: Hi! That was my original attempt :) , but I abandon it because I was wondering if we actually need this optimization? I think if we call committed() several times consecutively within a single event loop, it makes sense to coalesce them into a single request. Once we send out the request, I wonder how often do we invoke another committed() before the request gets sent out. Is it a use case from the stream side? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14810) Refactor FileRawSansphotWriter to not reference ReplicateLog
José Armando García Sancio created KAFKA-14810: -- Summary: Refactor FileRawSansphotWriter to not reference ReplicateLog Key: KAFKA-14810 URL: https://issues.apache.org/jira/browse/KAFKA-14810 Project: Kafka Issue Type: Task Reporter: José Armando García Sancio Assignee: José Armando García Sancio The current implementation of FileRawSnapshotWriter uses an Optional to propagate when a new snapshot has been created by the state machine. This abstraction is too strict when writing tests and should be changed to something like Consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KRAFT-14765 and KRAFT-14776: Support for SCRAM at bootstrap with integration tests
pprovenzano commented on code in PR #13374: URL: https://github.com/apache/kafka/pull/13374#discussion_r1136329445 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -221,4 +224,78 @@ Found problem: assertThrows(classOf[IllegalArgumentException], () => parseMetadataVersion("--release-version", "0.0")) } + + @Test + def testAddScram():Unit = { +def parseAddScram(strings: String*): Option[ArrayBuffer[UserScramCredentialRecord]] = { + var args = mutable.Seq("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ") + args ++= strings + val namespace = StorageTool.parseArguments(args.toArray) + StorageTool.getUserScramCredentialRecords(namespace) +} + +var scramRecords = parseAddScram() +assertEquals(None, scramRecords) + +// Validate we can add multiple SCRAM creds. +scramRecords = parseAddScram("-S", + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]", +"-S", + "SCRAM-SHA-256=[name=george,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]") + +assertEquals(2, scramRecords.get.size) + +// Require name subfield. +try assertEquals(1, parseAddScram("-S", + "SCRAM-SHA-256=[salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]")) catch { + case e: TerseFailure => assertEquals(s"You must supply 'name' to add-scram", e.getMessage) +} + +// Require password xor saltedpassword +try assertEquals(1, parseAddScram("-S", + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]")) +catch { + case e: TerseFailure => assertEquals(s"You must only supply one of 'password' or 'saltedpassword' to add-scram", e.getMessage) +} + +try assertEquals(1, parseAddScram("-S", + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",iterations=8192]")) +catch { + case e: TerseFailure => assertEquals(s"You must supply one of 'password' or 'saltedpassword' to add-scram", e.getMessage) +} + +// Validate salt is required with saltedpassword +try assertEquals(1, parseAddScram("-S", + "SCRAM-SHA-256=[name=alice,saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]")) +catch { + case e: TerseFailure => assertEquals(s"You must supply 'salt' with 'saltedpassword' to add-scram", e.getMessage) +} + +// Validate salt is optional with password +assertEquals(1, parseAddScram("-S", "SCRAM-SHA-256=[name=alice,password=alice,iterations=4096]").get.size) + +// Require 4096 <= iterations <= 16384 +try assertEquals(1, parseAddScram("-S", + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=16385]")) +catch { + case e: TerseFailure => assertEquals(s"The 'iterations' value must be <= 16384 for add-scram", e.getMessage) +} + +assertEquals(1, parseAddScram("-S", + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=16384]") + .get.size) + +try assertEquals(1, parseAddScram("-S", + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=4095]")) +catch { + case e: TerseFailure => assertEquals(s"The 'iterations' value must be >= 4096 for add-scram", e.getMessage) +} + +assertEquals(1, parseAddScram("-S", + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=4096]") + .get.size) + +// Validate iterations is optional +assertEquals(1, parseAddScram("-S", "SCRAM-SHA-256=[name=alice,password=alice]") .get.size) Review Comment: There is a check for metadata version in ScramImage and a test to validate that check. I have manually tested that the formatter produces an error if the IBP version is set to an earlier version and SCRAM records are added. Testing this in a unit test is a lot of work and not worth it as the cluster will just fail to come up if they bypass it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KRAFT-14765 and KRAFT-14776: Support for SCRAM at bootstrap with integration tests
pprovenzano commented on code in PR #13374: URL: https://github.com/apache/kafka/pull/13374#discussion_r1136323073 ## core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala: ## @@ -46,8 +47,8 @@ class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup { override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) - override def configureSecurityBeforeServersStart(): Unit = { -super.configureSecurityBeforeServersStart() + override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { +super.configureSecurityBeforeServersStart(testInfo) zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) // Create credentials before starting brokers createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) Review Comment: Yes, but converting this test was a pain so I deleted this file and moved the test to SaslScramSslEndToEndAuthorizationTest . The test now runs in both Zk and KRaft mode from there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KRAFT-14765 and KRAFT-14776: Support for SCRAM at bootstrap with integration tests
pprovenzano commented on code in PR #13374: URL: https://github.com/apache/kafka/pull/13374#discussion_r1136321668 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -96,6 +115,9 @@ object StorageTool extends Logging { action(store()). required(true). help("The cluster ID to use.") +formatParser.addArgument("--add-scram", "-S"). + action(append()). + help("Additional Bootstrap Metadata to add to the cluster.") Review Comment: Yes, I've added the description. ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -161,20 +163,21 @@ Found problem: val metaProperties = MetaProperties( clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2) val stream = new ByteArrayOutputStream() + val bootstrapMetadata = StorageTool.buildBootstrapMetadata(MetadataVersion.latest(), None, "test foramt command") Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KRAFT-14765 and KRAFT-14776: Support for SCRAM at bootstrap with integration tests
pprovenzano commented on code in PR #13374: URL: https://github.com/apache/kafka/pull/13374#discussion_r1136321240 ## clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java: ## @@ -40,11 +42,13 @@ public enum ScramMechanism { MECHANISMS_MAP = Collections.unmodifiableMap(map); } -ScramMechanism(String hashAlgorithm, String macAlgorithm, int minIterations) { +ScramMechanism(byte type, String hashAlgorithm, String macAlgorithm, int minIterations, int maxIterations) { +this.type = type; this.mechanismName = "SCRAM-" + hashAlgorithm; this.hashAlgorithm = hashAlgorithm; -this.macAlgorithm = macAlgorithm; +this.macAlgorithm = macAlgorithm; Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13380: KAFKA-14468: Committed API
guozhangwang commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1136315295 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } +static class UnsentOffsetFetchRequestState extends RequestState { +public final Set requestedPartitions; +public final GroupState.Generation requestedGeneration; +public CompletableFuture> future; + +public UnsentOffsetFetchRequestState(final Set partitions, + final GroupState.Generation generation, + final CompletableFuture> future, + final long retryBackoffMs) { +super(retryBackoffMs); +this.requestedPartitions = partitions; +this.requestedGeneration = generation; +this.future = future; +} + +public boolean sameRequest(final UnsentOffsetFetchRequestState request) { +return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); +} +} + +/** + * This is used to support the committed() API. Here we use a Java Collections, {@code unsentRequests}, to + * track + * the OffsetFetchRequests that haven't been sent, to prevent sending the same requests in the same batch. + * + * If the request is new. It will be enqueued to the {@code unsentRequest}, and will be sent upon the next + * poll. + * + * If the same request has been sent, the request's {@code CompletableFuture} will be completed upon the + * completion of the existing one. + * + * TODO: There is an optimization to present duplication to the sent but incompleted requests. I'm not sure if we + * need that. + */ +class UnsentOffsetFetchRequests { Review Comment: Just to think a bit further here, it makes me thinking that inside the `unsentOffsetFetchRequests` we'd need to keep two collections: 1) the unsent requests, 2) the sent-but-not-responded requests, and upon getting a new event from the queue, we would check against both collections. And then we we drain the first collection and write them to the network client, we move them to the second collection, and only drop requests from the second collection after a response is received and the registered handlers (it's possible to have multiple events' handlers registered for the same request). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13365: KAFKA-14491: [17/N] Refactor segments cleanup logic
vcrfxia commented on code in PR #13365: URL: https://github.com/apache/kafka/pull/13365#discussion_r1136263824 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java: ## @@ -57,6 +57,11 @@ public void openExisting(final ProcessorContext context, final long streamTime) physicalStore.openDB(context.appConfigs(), context.stateDir()); } +@Override +public void cleanupExpiredSegments(final long streamTime) { +super.cleanupExpiredSegments(streamTime); Review Comment: The method from AbstractSegments is protected, but LogicalKeyValueSegments needs to expose it publicly so that it can be called from the versioned store implementation. Admittedly looks odd at first glance. I can add a comment into the code if you think it'd be useful? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13365: KAFKA-14491: [17/N] Refactor segments cleanup logic
vcrfxia commented on code in PR #13365: URL: https://github.com/apache/kafka/pull/13365#discussion_r1136263824 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java: ## @@ -57,6 +57,11 @@ public void openExisting(final ProcessorContext context, final long streamTime) physicalStore.openDB(context.appConfigs(), context.stateDir()); } +@Override +public void cleanupExpiredSegments(final long streamTime) { +super.cleanupExpiredSegments(streamTime); Review Comment: The method from AbstractSegments is protected, but LogicalKeyValueSegments needs to expose it publicly so that it can be called from the versioned store implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores
vcrfxia commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1136261694 ## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ## @@ -302,7 +319,54 @@ public void shouldAllowCustomIQv2ForCustomStoreImplementations() { .withPartitions(Collections.singleton(0)); final StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); -assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); +assertThat(result.getOnlyPartitionResult().getResult(), equalTo("success")); +} + +@Test +public void shouldCreateGlobalTable() throws Exception { +// produce data to global store topic and track in-memory for processor to verify +final DataTracker data = new DataTracker(); +produceDataToTopic(globalTableTopic, data, baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); +produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); +produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + +// build topology and start app +final StreamsBuilder streamsBuilder = new StreamsBuilder(); + +streamsBuilder +.globalTable( +globalTableTopic, +Consumed.with(Serdes.Integer(), Serdes.String()), +Materialized +.as(new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION)) +.withKeySerde(Serdes.Integer()) +.withValueSerde(Serdes.String()) +); +streamsBuilder +.stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) +.process(() -> new VersionedStoreContentCheckerProcessor(false, data)) +.to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + +final Properties props = props(); +kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); +kafkaStreams.start(); + +// produce source data to trigger store verifications in processor +int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp + 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8")); + +// wait for output and verify +final List> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( +TestUtils.consumerConfig( +CLUSTER.bootstrapServers(), +IntegerDeserializer.class, +IntegerDeserializer.class), +outputStream, +numRecordsProduced); + +for (final KeyValue receivedRecord : receivedRecords) { +// verify zero failed checks for each record +assertThat(receivedRecord.value, equalTo(0)); Review Comment: > In your original comment, you say we cannot get the data from global-ktable because we cannot inject a Processor Hm, not sure which comment this refers to. Maybe the comment I made earlier was about not being able to issue IQ against versioned stores? Ideally the way these tests would be written would be to use IQ to check the contents of the stores directly, but because versioned stores don't support that (yet) that's why the tests inspect the contents of the stores with a processor. The processor writes the number of errors to an output stream and the test validates that the output stream contains only zeros (indicating no errors). > we could use addGlobalStore instead of globalTable to add a Processor. This test already has a processor which inspects/validates the contents of the global store. Have I misunderstood? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136255311 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: The tagged field magic happens in the lower layer of the FetchRequestData.java, which is auto-generated. As long as the final FetchRequestData object has a default value ReplicaState, it will be removed from the serialized message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136238469 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: > So we still rely on IBP to know the version? Yes > does the builder still populate the field? Yes, when constructing an instance, it will do this.replicaState = new ReplicaState() -> this relates to the FetchRequestData.java > Does it automatically remove it if we keep the defaults When serialize the request in FetchRequestData.java ``` if (!this.replicaState.equals(new ReplicaState())) { _writable.writeUnsignedVarint(1); _writable.writeUnsignedVarint(this.replicaState.size(_cache, _version)); replicaState.write(_writable, _cache, _version); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores
vcrfxia commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1136258765 ## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ## @@ -334,18 +399,65 @@ private final int produceSourceData(final long timestamp, } /** - * Test-only processor for inserting records into a versioned store while also tracking - * them separately in-memory, and performing checks to validate expected store contents. - * Forwards the number of failed checks downstream for consumption. + * @param topic topic to produce to + * @param dataTracker map of key -> timestamp -> value for tracking data which is produced to + *the topic. This method will add the produced data into this in-memory + *tracker in addition to producing to the topic, in order to keep the two + *in sync. + * @param timestamp timestamp to produce with + * @param keyValues key-value pairs to produce + * + * @return number of records produced + */ +@SuppressWarnings("varargs") +@SafeVarargs +private final int produceDataToTopic(final String topic, + final DataTracker dataTracker, + final long timestamp, + final KeyValue... keyValues) { +produceDataToTopic(topic, timestamp, keyValues); + +for (final KeyValue keyValue : keyValues) { +dataTracker.add(keyValue.key, timestamp, keyValue.value); +} + +return keyValues.length; +} + +/** + * Test-only processor for validating expected contents of a versioned store, and forwards + * the number of failed checks downstream for consumption. Callers specify whether the + * processor should also be responsible for inserting records into the store (while also + * tracking them separately in-memory for use in validation). */ private static class VersionedStoreContentCheckerProcessor implements Processor { private ProcessorContext context; private VersionedKeyValueStore store; +// whether or not the processor should write records to the store as they arrive. +// must be false for global stores. Review Comment: When a processor accesses a global store, the store is read-only: https://github.com/apache/kafka/blob/404a833df7de9a7d5efe35beb5bafb7c6972601e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L167-L170 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136255311 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: The tagged field magic happens in the lower layer of the FetchRequestData.java, which is auto-generated. As long as the final result FetchRequestData object has a default value ReplicaState, it will be removed from the serialized message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136255311 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: The tagged field magic happens in the lower layer of the FetchRequestData.java, which is auto-generated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores
vcrfxia commented on PR #13292: URL: https://github.com/apache/kafka/pull/13292#issuecomment-1468887946 Thanks @mjsax for your review! I made the refactor you suggested (including to the existing WindowedChangelogTopicConfig) and also pushed updates to InternalTopicManager. InternalTopicManager is currently unused code but I figured it'd be good to update it for the new type of internal topic anyway for completeness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136249875 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: Correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores
vcrfxia commented on code in PR #13292: URL: https://github.com/apache/kafka/pull/13292#discussion_r1136249663 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ## @@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() { private InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory factory, final String name) { -if (factory.isWindowStore()) { +if (factory.isVersionedStore()) { +final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig()); +config.setMinCompactionLagMs(factory.historyRetention()); Review Comment: > Yes, I think it would be cleaner this way. OK, made the updates. > `delete.retetion.ms` is not for retention based topics, but it's for compacted topic Ah I see. My scala's not the best but it looks like `min.compaction.lag.ms` guarantees that any record within `min.compaction.lag.ms` of "now" will not be compacted, regardless of `delete.retention.ms`, which is the important point that we need to guarantee that older record versions are not prematurely compacted. Interestingly, we might have a case for setting `delete.retention.ms = 0` (rather than using the default of 24 hours) since we know that we no longer need the older tombstones once `min.compaction.lag.ms` is expired. It's not strictly necessary though. Do you think we should set it for completeness? I'm fine either way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
jolshan commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136245326 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: > Yes, when constructing an instance, it will do this.replicaState = new ReplicaState() The builder is still going to write to these feels as far as I can see in the code. (FetchRequest line 266) Its just the case that these happen to be the defaults and so the state is equal, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136238469 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: > So we still rely on IBP to know the version? Yes > does the builder still populate the field? Yes, when constructing an instance, it will do this.replicaState = new ReplicaState() > Does it automatically remove it if we keep the defaults When serialize the request: ``` if (!this.replicaState.equals(new ReplicaState())) { _writable.writeUnsignedVarint(1); _writable.writeUnsignedVarint(this.replicaState.size(_cache, _version)); replicaState.write(_writable, _cache, _version); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
jolshan commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136221498 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: Also, does the builder still populate the field? Does it automatically remove it if we keep the defaults? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
jolshan commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136220531 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: I see. So we still rely on IBP to know the version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136218503 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: It is to reduce the message size for consumer cases where the replicaId and replicaEpoch are both -1. When the values are default values (both fields are -1), the replicaState will not be parsed in the message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13396: KAFKA-13884; Only voters flush on Fetch response
jsancio commented on code in PR #13396: URL: https://github.com/apache/kafka/pull/13396#discussion_r1136211424 ## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ## @@ -975,6 +980,18 @@ void assertFetchRequestData( assertEquals(fetchOffset, fetchPartition.fetchOffset()); assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch()); assertEquals(localId.orElse(-1), request.replicaId()); + +// Assert that voters have flushed up to the fetch offset +if (localId.isPresent() && voters.contains(localId.getAsInt())) { Review Comment: Okay. Added a check for observers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13396: KAFKA-13884; Only voters flush on Fetch response
hachikuji commented on code in PR #13396: URL: https://github.com/apache/kafka/pull/13396#discussion_r1136194254 ## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ## @@ -975,6 +980,18 @@ void assertFetchRequestData( assertEquals(fetchOffset, fetchPartition.fetchOffset()); assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch()); assertEquals(localId.orElse(-1), request.replicaId()); + +// Assert that voters have flushed up to the fetch offset +if (localId.isPresent() && voters.contains(localId.getAsInt())) { Review Comment: If someone removed the optimization, it might be a notable regression in performance, but no tests would fail. I think that makes it worthwhile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13396: KAFKA-13884; Only voters flush on Fetch response
jsancio commented on code in PR #13396: URL: https://github.com/apache/kafka/pull/13396#discussion_r1136182878 ## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ## @@ -975,6 +980,18 @@ void assertFetchRequestData( assertEquals(fetchOffset, fetchPartition.fetchOffset()); assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch()); assertEquals(localId.orElse(-1), request.replicaId()); + +// Assert that voters have flushed up to the fetch offset +if (localId.isPresent() && voters.contains(localId.getAsInt())) { Review Comment: Yeah. I am not sure. Not sure if we should check for "performance" in the correctness tests. I am also not sure what we would check for either. For example, I can add a boolean to `MockLog` that gets set to `true` on flush and set to `false` when `RaftClientTextContext` checks it. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14809) Connect incorrectly logs that no records were produced by source tasks
[ https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700372#comment-17700372 ] Hector Geraldino commented on KAFKA-14809: -- No that's perfect. Thanks [~ChrisEgerton]! > Connect incorrectly logs that no records were produced by source tasks > -- > > Key: KAFKA-14809 > URL: https://issues.apache.org/jira/browse/KAFKA-14809 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > > There's an *{{if}}* condition when [committing > offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] > that is referencing the wrong variable, so the statement always evaluates to > {*}true{*}. > This causes log statements like the following to be spuriously emitted: > {quote}[2023-03-14 16:18:04,675] DEBUG WorkerSourceTask\{id=job-0} Either no > records were produced by the task since the last offset commit, or every > record has been filtered out by a transformation or dropped due to > transformation or conversion errors. > (org.apache.kafka.connect.runtime.WorkerSourceTask:220) > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
jolshan commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136148988 ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,23 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, + // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ Review Comment: Sorry if I missed this conversation, but why is this tagged? Is it to avoid the IBP bump? We would still need it if we no longer set replica ID right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14809) Connect incorrectly logs that no records were produced by source tasks
[ https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700360#comment-17700360 ] Chris Egerton commented on KAFKA-14809: --- [~hgeraldino] I wanted a Jira ticket for this so that users could easily find the cause of the problem if they were confused by the incorrect log messages, so I've taken a stab at changing the title and description to describe not just the specific bug in the code, but the user-facing effects that it has. Feel free to alter anything you'd like; I just want this to be discoverable by users who may be searching for, e.g., log messages to understand what's going wrong. > Connect incorrectly logs that no records were produced by source tasks > -- > > Key: KAFKA-14809 > URL: https://issues.apache.org/jira/browse/KAFKA-14809 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > > There's an *{{if}}* condition when [committing > offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] > that is referencing the wrong variable, so the statement always evaluates to > {*}true{*}. > This causes log statements like the following to be spuriously emitted: > {quote}[2023-03-14 16:18:04,675] DEBUG WorkerSourceTask\{id=job-0} Either no > records were produced by the task since the last offset commit, or every > record has been filtered out by a transformation or dropped due to > transformation or conversion errors. > (org.apache.kafka.connect.runtime.WorkerSourceTask:220) > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14809) Connect incorrectly logs that no records were produced by source tasks
[ https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14809: -- Description: There's an *{{if}}* condition when [committing offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] that is referencing the wrong variable, so the statement always evaluates to {*}true{*}. This causes log statements like the following to be spuriously emitted: {quote}[2023-03-14 16:18:04,675] DEBUG WorkerSourceTask\{id=job-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:220) {quote} was: There's an *{{if}}* condition when [committing offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] that is referencing the wrong variable, so the statement always evaluates to {*}true{*}. This causes log statements like the following to be spuriously emitted: {quote} {quote} > Connect incorrectly logs that no records were produced by source tasks > -- > > Key: KAFKA-14809 > URL: https://issues.apache.org/jira/browse/KAFKA-14809 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > > There's an *{{if}}* condition when [committing > offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] > that is referencing the wrong variable, so the statement always evaluates to > {*}true{*}. > This causes log statements like the following to be spuriously emitted: > {quote}[2023-03-14 16:18:04,675] DEBUG WorkerSourceTask\{id=job-0} Either no > records were produced by the task since the last offset commit, or every > record has been filtered out by a transformation or dropped due to > transformation or conversion errors. > (org.apache.kafka.connect.runtime.WorkerSourceTask:220) > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
jolshan commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136146308 ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -302,6 +335,10 @@ public String toString() { } } +public static int replicaId(FetchRequestData fetchRequestData) { +return fetchRequestData.replicaId() != -1 ? fetchRequestData.replicaId() : fetchRequestData.replicaState().replicaId(); Review Comment: In the case of a consumer, we always read replica state (the second part of the statement). I don't think there are any bugs here, but it might be good to keep in mind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14809) Connect incorrectly logs that no records were produced by source task
[ https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14809: -- Summary: Connect incorrectly logs that no records were produced by source task (was: Kafka Connect incorrectly logs that no records were produced by source task) > Connect incorrectly logs that no records were produced by source task > - > > Key: KAFKA-14809 > URL: https://issues.apache.org/jira/browse/KAFKA-14809 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > > There's an *{{if}}* condition when [committing > offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] > that is referencing the wrong variable, so the statement always evaluates to > {*}true{*}. > This causes log statements like the following to be spuriously emitted: > {quote} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14809) Connect incorrectly logs that no records were produced by source tasks
[ https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14809: -- Summary: Connect incorrectly logs that no records were produced by source tasks (was: Connect incorrectly logs that no records were produced by source task) > Connect incorrectly logs that no records were produced by source tasks > -- > > Key: KAFKA-14809 > URL: https://issues.apache.org/jira/browse/KAFKA-14809 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > > There's an *{{if}}* condition when [committing > offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] > that is referencing the wrong variable, so the statement always evaluates to > {*}true{*}. > This causes log statements like the following to be spuriously emitted: > {quote} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14809) Kafka Connect incorrectly logs that no records were produced by source task
[ https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14809: -- Description: There's an *{{if}}* condition when [committing offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] that is referencing the wrong variable, so the statement always evaluates to {*}true{*}. This causes log statements like the following to be spuriously emitted: {quote} {quote} was: There's an *{{if}}* condition when [committing offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] that is referencing the wrong variable, so the statement always evaluates to {*}true{*}. It's a subtle bug, which went undetected probably because its only used to log information about pending committable offsets. > Kafka Connect incorrectly logs that no records were produced by source task > --- > > Key: KAFKA-14809 > URL: https://issues.apache.org/jira/browse/KAFKA-14809 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > > There's an *{{if}}* condition when [committing > offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] > that is referencing the wrong variable, so the statement always evaluates to > {*}true{*}. > This causes log statements like the following to be spuriously emitted: > {quote} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14809) Kafka Connect incorrectly logs that no records were produced by source task
[ https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14809: -- Summary: Kafka Connect incorrectly logs that no records were produced by source task (was: Fix logging conditional on WorkerSourceTask) > Kafka Connect incorrectly logs that no records were produced by source task > --- > > Key: KAFKA-14809 > URL: https://issues.apache.org/jira/browse/KAFKA-14809 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > > There's an *{{if}}* condition when [committing > offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] > that is referencing the wrong variable, so the statement always evaluates to > {*}true{*}. > It's a subtle bug, which went undetected probably because its only used to > log information about pending committable offsets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14809) Fix logging conditional on WorkerSourceTask
[ https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14809: -- Priority: Minor (was: Trivial) > Fix logging conditional on WorkerSourceTask > --- > > Key: KAFKA-14809 > URL: https://issues.apache.org/jira/browse/KAFKA-14809 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > > There's an *{{if}}* condition when [committing > offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] > that is referencing the wrong variable, so the statement always evaluates to > {*}true{*}. > It's a subtle bug, which went undetected probably because its only used to > log information about pending committable offsets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hgeraldino commented on pull request #13386: KAFKA-14809 Fix logging conditional on WorkerSourceTask
hgeraldino commented on PR #13386: URL: https://github.com/apache/kafka/pull/13386#issuecomment-1468755316 > LGTM, thanks @hgeraldino! > > It would be nice to have a Jira for this so that others who notice the logging issue on older versions can know 1) that it is a bug and 2) which versions it is fixed on. Would you mind filing a ticket and linking it to this PR in the title? We can merge after that. Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14809) Fix logging conditional on WorkerSourceTask
Hector Geraldino created KAFKA-14809: Summary: Fix logging conditional on WorkerSourceTask Key: KAFKA-14809 URL: https://issues.apache.org/jira/browse/KAFKA-14809 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Hector Geraldino Assignee: Hector Geraldino There's an *{{if}}* condition when [committing offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219] that is referencing the wrong variable, so the statement always evaluates to {*}true{*}. It's a subtle bug, which went undetected probably because its only used to log information about pending committable offsets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #13396: KAFKA-13884; Only voters flush on Fetch response
hachikuji commented on code in PR #13396: URL: https://github.com/apache/kafka/pull/13396#discussion_r1136130884 ## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ## @@ -975,6 +980,18 @@ void assertFetchRequestData( assertEquals(fetchOffset, fetchPartition.fetchOffset()); assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch()); assertEquals(localId.orElse(-1), request.replicaId()); + +// Assert that voters have flushed up to the fetch offset +if (localId.isPresent() && voters.contains(localId.getAsInt())) { Review Comment: Is there anything we can assert in the converse case to verify that the log is not flushed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1136104123 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -149,6 +185,29 @@ Queue stagedCommits() { return this.stagedCommits; } +/** + * Get all the sendable requests, and create a list of UnsentRequest. + */ +private List sendOffsetFetchRequests(final long currentTimeMs) { +List requests = unsentOffsetFetchRequests.sendableRequests(currentTimeMs); +List pollResults = new ArrayList<>(); +requests.forEach(req -> { +OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder( +groupState.groupId, true, +new ArrayList<>(req.requestedPartitions), +throwOnFetchStableOffsetUnsupported); +NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( +builder, +coordinatorRequestManager.coordinator()); +FetchCommittedOffsetResponseHandler cb = new FetchCommittedOffsetResponseHandler(req); +unsentRequest.future().whenComplete((r, t) -> { Review Comment: When we first made the FutureCompletionHandler, we explicitly handled the responses in the onComplete method here: ```if (response.authenticationException() != null) { onFailure(response.authenticationException()); } else if (response.wasDisconnected()) { onFailure(DisconnectException.INSTANCE); } else if (response.versionMismatch() != null) { onFailure(response.versionMismatch()); } else { future.complete(response); } ``` I was thinking about your suggestion, but since you mentioned it again, let me try to refactor this a bit to make it more pluggin-able -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API
philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1136096815 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -80,16 +97,18 @@ public CommitRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { -maybeAutoCommit(currentTimeMs); +maybeAutoCommit(); +List unsentRequests = new ArrayList<>(); -if (stagedCommits.isEmpty()) { -return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()); +if (!stagedCommits.isEmpty()) { Review Comment: Make sense. I think the reason I did that initially was that I wanted to send out all of the commits first before sending the fetchOffsetRequest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #13396: KAFKA-13884; Only voters flush on Fetch response
jsancio opened a new pull request, #13396: URL: https://github.com/apache/kafka/pull/13396 The leader only requires that voters have flushed their log up to the fetch offset before sending a fetch request. This change only flushes the log when handling the fetch response, if the follower is a voter. This should improve the disk performance on observers (brokers). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14794) Unable to deserialize base64 JSON strings
[ https://issues.apache.org/jira/browse/KAFKA-14794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio resolved KAFKA-14794. Fix Version/s: 3.5.0 Resolution: Fixed > Unable to deserialize base64 JSON strings > -- > > Key: KAFKA-14794 > URL: https://issues.apache.org/jira/browse/KAFKA-14794 > Project: Kafka > Issue Type: Bug >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.5.0 > > > h1. Problem > The following test fails: > {code:java} > @Test > public void testBinaryNode() throws IOException { > byte[] expected = new byte[] {5, 2, 9, 4, 1, 8, 7, 0, 3, 6}; > StringWriter writer = new StringWriter(); > ObjectMapper mapper = new ObjectMapper(); > mapper.writeTree(mapper.createGenerator(writer), new > BinaryNode(expected)); > JsonNode binaryNode = mapper.readTree(writer.toString()); > assertTrue(binaryNode.isTextual(), binaryNode.toString()); > byte[] actual = MessageUtil.jsonNodeToBinary(binaryNode, "Test base64 > JSON string"); > assertEquals(expected, actual); > } > {code} > with the following error: > {code:java} > Gradle Test Run :clients:test > Gradle Test Executor 20 > MessageUtilTest > > testBinaryNode() FAILED > java.lang.RuntimeException: Test base64 JSON string: expected > Base64-encoded binary data. > at > org.apache.kafka.common.protocol.MessageUtil.jsonNodeToBinary(MessageUtil.java:165) > at > org.apache.kafka.common.protocol.MessageUtilTest.testBinaryNode(MessageUtilTest.java:102) > {code} > The reason for the failure is because FasterXML Jackson deserializes base64 > JSON strings to a TextNode not to a BinaryNode. > h1. Solution > The method {{MessageUtil::jsonNodeToBinary}} should not assume that the input > {{JsonNode}} is always a {{{}BinaryNode{}}}. It should also support decoding > {{{}TextNode{}}}. > {{JsonNode::binaryValue}} is supported by both {{BinaryNode}} and > {{{}TextNode{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] chia7712 opened a new pull request, #13395: MINOR: don't disconnect stale controller if the network client is res…
chia7712 opened a new pull request, #13395: URL: https://github.com/apache/kafka/pull/13395 the unsent request stored by `InterBrokerSendThread` uses stale controller even though the `NetworkClient` is reset already. The request will get error `NOT_CONTROLLER`, and then the error handle will try to disconnect the "new" controller. The new controller is not connected so the call `networkClient.disconnect(controllerAddress.idString)` will cause following error. ``` java.lang.IllegalStateException: No entry found for connection 1000 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:411) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:182) at org.apache.kafka.clients.NetworkClient.disconnect(NetworkClient.java:328) at kafka.server.BrokerToControllerRequestThread.$anonfun$handleResponse$9(BrokerToControllerChannelManager.scala:405) at kafka.server.BrokerToControllerRequestThread.$anonfun$handleResponse$9$adapted(BrokerToControllerChannelManager.scala:402) at scala.Option.foreach(Option.scala:437) at kafka.server.BrokerToControllerRequestThread.handleResponse(BrokerToControllerChannelManager.scala:402) at kafka.server.BrokerToControllerRequestThread.$anonfun$generateRequests$1(BrokerToControllerChannelManager.scala:377) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) at kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:78) at kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:422) at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127) ``` This is not a critical bug since we swallow the exception, but the error is a bit scare to me when testing zk migration :) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13379: KAFKA-14799: Ignore source task requests to abort empty transactions
C0urante commented on PR #13379: URL: https://github.com/apache/kafka/pull/13379#issuecomment-1468683227 Test failures are unrelated; merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13379: KAFKA-14799: Ignore source task requests to abort empty transactions
C0urante merged PR #13379: URL: https://github.com/apache/kafka/pull/13379 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology
mumrah commented on PR #13390: URL: https://github.com/apache/kafka/pull/13390#issuecomment-1468662689 @cmccabe, I see your point about the node ID when debugging tests -- it can be annoying to not know which broker instance a thread belongs to. You're kebab case examples look good to me Did you find where the lone `EventHandler` is coming from? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13386: Fix logging conditional
C0urante commented on code in PR #13386: URL: https://github.com/apache/kafka/pull/13386#discussion_r1136051929 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java: ## @@ -216,7 +216,7 @@ public boolean commitOffsets() { this.committableOffsets = CommittableOffsets.EMPTY; } -if (committableOffsets.isEmpty()) { +if (offsetsToCommit.isEmpty()) { Review Comment: Haha, no worries! Considering I wrote this buggy logic in the first place I'm at least equally embarrassed here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13384: KAFKA-14801: Handle sensitive configs during ZK migration
mumrah commented on code in PR #13384: URL: https://github.com/apache/kafka/pull/13384#discussion_r1136050391 ## core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala: ## @@ -49,12 +51,25 @@ class ZkMigrationClientTest extends QuorumTestHarness { private var migrationState: ZkMigrationLeadershipState = _ + private val SECRET = "secret" + + private val encoder: PasswordEncoder = { +val encoderProps = new Properties() +encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation +encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk secret to encrypt the +val encoderConfig = new KafkaConfig(encoderProps) Review Comment: Oh, I see -- we're getting the defaults for things like the cipher, key length, etc. Let's keep this in, in that case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13368: KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft
mumrah commented on code in PR #13368: URL: https://github.com/apache/kafka/pull/13368#discussion_r1136047272 ## core/src/main/scala/kafka/zk/ZkMigrationClient.scala: ## @@ -211,12 +214,38 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo } } + def migrateAcls(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = { +// This is probably fairly inefficient, but it preserves the semantics from AclAuthorizer (which is non-trivial) +var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) +def updateAcls(resourcePattern: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + allAcls = allAcls.updated(resourcePattern, versionedAcls) +} + +AclAuthorizer.loadAllAcls(zkClient, this, updateAcls) Review Comment: Yes, good call-out -- we will need documentation on this. We will only support migrating ACLs from AclAuthorizer to StandardAuthorizer. Otherwise, users can continue using whatever authorizer they had configured in ZK mode. In fact, AclAuthorizer _could_ still be used in KRaft mode, but we want to discourage that :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13368: KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft
rajinisivaram commented on code in PR #13368: URL: https://github.com/apache/kafka/pull/13368#discussion_r1136044199 ## core/src/main/scala/kafka/zk/ZkMigrationClient.scala: ## @@ -211,12 +214,38 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo } } + def migrateAcls(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = { +// This is probably fairly inefficient, but it preserves the semantics from AclAuthorizer (which is non-trivial) +var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) +def updateAcls(resourcePattern: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + allAcls = allAcls.updated(resourcePattern, versionedAcls) +} + +AclAuthorizer.loadAllAcls(zkClient, this, updateAcls) Review Comment: Authorizers are configurable, so could in theory store ACLs in different format. Are we going to document that we only support migration from ZK for ACLs stored by AclAuthorizer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on a diff in pull request #13384: KAFKA-14801: Handle sensitive configs during ZK migration
akhileshchg commented on code in PR #13384: URL: https://github.com/apache/kafka/pull/13384#discussion_r1136044153 ## core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala: ## @@ -49,12 +51,25 @@ class ZkMigrationClientTest extends QuorumTestHarness { private var migrationState: ZkMigrationLeadershipState = _ + private val SECRET = "secret" + + private val encoder: PasswordEncoder = { +val encoderProps = new Properties() +encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation +encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk secret to encrypt the +val encoderConfig = new KafkaConfig(encoderProps) Review Comment: We still need the encoder and right configs to actually encode and decode the configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13384: KAFKA-14801: Handle sensitive configs during ZK migration
mumrah commented on code in PR #13384: URL: https://github.com/apache/kafka/pull/13384#discussion_r1136040540 ## core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala: ## @@ -49,12 +51,25 @@ class ZkMigrationClientTest extends QuorumTestHarness { private var migrationState: ZkMigrationLeadershipState = _ + private val SECRET = "secret" + + private val encoder: PasswordEncoder = { +val encoderProps = new Properties() +encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation +encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk secret to encrypt the +val encoderConfig = new KafkaConfig(encoderProps) Review Comment: I don't think we need these anymore, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136021203 ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -1436,6 +1441,38 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) +public void testFetchRequestVersionHandling(short version) throws Exception { Review Comment: Updated the UT: 1. With extra comments. 2. Covered the first tryCompleteFetchRequest in the handleFetchRequest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request, #13393: KAFKA-10244 An new java interface to replace 'kafka.common.MessageReader'
chia7712 opened a new pull request, #13393: URL: https://github.com/apache/kafka/pull/13393 related to https://issues.apache.org/jira/browse/KAFKA-10244 `kafka.common.MessageReader` is a input argument of kafka-console-producer and we expect users can have their custom reader to produce custom records. Hence, MessageReader is a public interface and we should offer a java version to replace current scala code. Also, the new MessageReader should be placed at clients module. (kafka.common.MessageReader is in core module) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13344: MINOR: Replace BrokerMetadataListener with MetadataLoader
cmccabe merged PR #13344: URL: https://github.com/apache/kafka/pull/13344 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino commented on a diff in pull request #13386: Fix logging conditional
hgeraldino commented on code in PR #13386: URL: https://github.com/apache/kafka/pull/13386#discussion_r1135857544 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java: ## @@ -216,7 +216,7 @@ public boolean commitOffsets() { this.committableOffsets = CommittableOffsets.EMPTY; } -if (committableOffsets.isEmpty()) { +if (offsetsToCommit.isEmpty()) { Review Comment: Yeah this is embarrassing 臘 Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology
cmccabe commented on code in PR #13390: URL: https://github.com/apache/kafka/pull/13390#discussion_r1135927103 ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -131,11 +131,11 @@ class ControllerServer( if (!maybeChangeStatus(SHUTDOWN, STARTING)) return val startupDeadline = Deadline.fromDelay(time, config.serverMaxStartupTimeMs, TimeUnit.MILLISECONDS) try { + this.logIdent = new LogContext(s"[ControllerServer ${config.nodeId}] ").logPrefix() info("Starting controller") config.dynamicConfig.initialize(zkClientOpt = None) maybeChangeStatus(STARTING, STARTED) - this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix() Review Comment: Yes, this use-case is a bit of a hack. (A hack that this PR didn't add!) Basically we're forced to use the old Scala `Logging` trait unless we want to rewrite all the BrokerServer stuff. But we want a nice-looking log prefix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology
cmccabe commented on PR #13390: URL: https://github.com/apache/kafka/pull/13390#issuecomment-1468497417 cc @ijuma @mumrah @hachikuji So about the thread names thing. I’m open to changing the thread names to “kebab case” (i.e. `my-thread-name`) I do think in the context of JUnit we definitely need to have `broker-0-my-thread-name` and `controller-0-my-thread-name`. I find myself looking at JUnit backtraces all too often, and having 6 distinct threads named `my-thread-name` just doesn't work for me. So then the big question becomes whether we would want the prefixes in prod or not. The "pro" case is that it simplifies the code to just unconditionally do that, and avoid cases where someone accidentally forgets to set the prefix. The con case is that we should know what node we’re on, so the information is redundant. Although I’ve seen people do weird things like combine several process backtraces into one file or send ZK and Kafka logs all to the same file. So I don’“t truly believe the “we’ll never need it” case. Maybe “we rarely need it” or “we won’t need it if people are reasonable” -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag
gharris1727 commented on PR #13367: URL: https://github.com/apache/kafka/pull/13367#issuecomment-1468475024 > One high-level thought: it seems like we've elected to drop integration testing coverage for offset.lag.max = 0 and replace it with offset.lag.max = 10. Do you think there's any benefit in retaining at least one case where the max lag is 0? Before KAFKA-12468, we did not have offset.lag.max=0 coverage, so this is returning to the coverage we had before that patch. Since offset.lag.max=0 does have some effect, i'll leave it active in the transactional test. I didn't think it was valuable enough to justify a new test suite's runtime, but using it in an existing test shouldn't be too harmful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag
gharris1727 commented on code in PR #13367: URL: https://github.com/apache/kafka/pull/13367#discussion_r1135890001 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -315,18 +314,16 @@ static class PartitionState { // true if we should emit an offset sync boolean update(long upstreamOffset, long downstreamOffset) { -// This value is what OffsetSyncStore::translateOffsets would compute for this offset given the last sync. -// Because this method is called at most once for each upstream offset, simplify upstreamStep to 1. +// Emit an offset sync if any of the following conditions are true +boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == -1L; +// the OffsetSync::translateDownstream method will translate this offset 1 past the last sync, so add 1. // TODO: share common implementation to enforce this relationship -long downstreamTargetOffset = lastSyncDownstreamOffset + 1; -if (lastSyncDownstreamOffset == -1L -|| downstreamOffset - downstreamTargetOffset >= maxOffsetLag -|| upstreamOffset - previousUpstreamOffset != 1L Review Comment: Oops this leaked from another fix, reverting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1135754940 ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -130,10 +131,33 @@ private static Optional optionalEpoch(int rawEpochValue) { } } +public static class SimpleBuilder extends AbstractRequest.Builder { +private final FetchRequestData fetchRequestData; +public SimpleBuilder(FetchRequestData fetchRequestData) { +super(ApiKeys.FETCH); +this.fetchRequestData = fetchRequestData; +} + +@Override +public FetchRequest build(short version) { +int replicaId = FetchRequest.replicaId(fetchRequestData); +long replicaEpoch = fetchRequestData.replicaState().replicaEpoch(); +if (version < 15) { +fetchRequestData.setReplicaId(replicaId); +fetchRequestData.setReplicaState(new ReplicaState()); +} else { +fetchRequestData.setReplicaState(new ReplicaState().setReplicaId(replicaId).setReplicaEpoch(replicaEpoch)); +fetchRequestData.setReplicaId(-1); +} Review Comment: My understanding is that we always use the new format everywhere so we should only care about downgrading, no? If we get a replica id >= 0, we could even consider throwing an UnsupportedVersionException for instance. ## core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala: ## @@ -44,7 +45,10 @@ object KafkaNetworkChannel { case fetchRequest: FetchRequestData => // Since we already have the request, we go through a simplified builder new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) { - override def build(version: Short): FetchRequest = new FetchRequest(fetchRequest, version) + override def build(version: Short): FetchRequest = { +val builder = new SimpleBuilder(fetchRequest) +new FetchRequest(builder.build(version).data(), version) + } override def toString: String = fetchRequest.toString } Review Comment: You can replace all of this by `new FetchRequest.SimpleBuilder(fetchRequest)`. ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -130,10 +131,33 @@ private static Optional optionalEpoch(int rawEpochValue) { } } +public static class SimpleBuilder extends AbstractRequest.Builder { Review Comment: nit: Could we put a comment saying that this is only used by the KafkaRaftClient? ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -983,16 +987,16 @@ private CompletableFuture handleFetchRequest( Errors error = Errors.forException(cause); if (error != Errors.REQUEST_TIMED_OUT) { logger.debug("Failed to handle fetch from {} at {} due to {}", -request.replicaId(), fetchPartition.fetchOffset(), error); +FetchRequest.replicaId(request), fetchPartition.fetchOffset(), error); return buildEmptyFetchResponse(error, Optional.empty()); } } // FIXME: `completionTimeMs`, which can be null logger.trace("Completing delayed fetch from {} starting at offset {} at {}", -request.replicaId(), fetchPartition.fetchOffset(), completionTimeMs); +FetchRequest.replicaId(request), fetchPartition.fetchOffset(), completionTimeMs); -return tryCompleteFetchRequest(request.replicaId(), fetchPartition, time.milliseconds()); +return tryCompleteFetchRequest(FetchRequest.replicaId(request), fetchPartition, time.milliseconds()); Review Comment: nit: Would it make sense to pull `FetchRequest.replicaId(request)` into a variable instead of calling it everywhere? ## core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala: ## @@ -116,4 +119,18 @@ class RemoteLeaderEndPointTest { assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1)) assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1)) } + +@Test +def testBrokerEpochSupplier(): Unit = { +val tp = new TopicPartition("topic1", 0) +val topicId1 = Uuid.randomUuid() +val log: UnifiedLog = mock(classOf[UnifiedLog]) +val partitionMap = Map( +tp -> PartitionFetchState(Some(topicId1), 150, None, 0, None, state = Fetching, lastFetchedEpoch = None)) +when(replicaManager.localLogOrException(tp)).thenReturn(log) +when(log.logStartOffset).thenReturn(1) +val ResultWithPartitions(fetchRequestOpt, partitionsWithError) =
[GitHub] [kafka] hgeraldino commented on a diff in pull request #13386: Fix logging conditional
hgeraldino commented on code in PR #13386: URL: https://github.com/apache/kafka/pull/13386#discussion_r1135857544 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java: ## @@ -216,7 +216,7 @@ public boolean commitOffsets() { this.committableOffsets = CommittableOffsets.EMPTY; } -if (committableOffsets.isEmpty()) { +if (offsetsToCommit.isEmpty()) { Review Comment: Yeah this is embarrassing :( Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13386: Fix logging conditional
C0urante commented on code in PR #13386: URL: https://github.com/apache/kafka/pull/13386#discussion_r1135813644 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java: ## @@ -216,7 +216,7 @@ public boolean commitOffsets() { this.committableOffsets = CommittableOffsets.EMPTY; } -if (committableOffsets.isEmpty()) { +if (offsetsToCommit.isEmpty()) { Review Comment: Don't we need to update the logging in the `else` branch to use `offsetsToCommit` instead of `committableOffsets` as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13392: MINOR: Fix error check in zombie fencing for exactly-once source connectors
C0urante merged PR #13392: URL: https://github.com/apache/kafka/pull/13392 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13384: KAFKA-14801 : Decode the sensitive configs and Zk and use encrypted records before writing them to KRaft log
mumrah commented on code in PR #13384: URL: https://github.com/apache/kafka/pull/13384#discussion_r1135792879 ## core/src/main/scala/kafka/zk/ZkMigrationClient.scala: ## @@ -135,20 +138,34 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo } def migrateBrokerConfigs(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = { -val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker) val batch = new util.ArrayList[ApiMessageAndVersion]() +val zkPasswordEncoder = kafkaConfig.passwordEncoderSecret match { Review Comment: Can we pass in a PasswordEncoder to ZkMigrationClient rather than the full KafkaConfig? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13379: KAFKA-14799: Ignore source task requests to abort empty transactions
C0urante commented on PR #13379: URL: https://github.com/apache/kafka/pull/13379#issuecomment-1468372731 Thanks @chia7712, good catch with that edge case! You're correct about the behavior in that scenario; the record-based boundary is given precedence over the batch-based one. I've added test cases to cover both instances of this scenario (record-based commit and batch-based abort, and batch-based commit and record-based abort), and I've updated the Javadocs for `TransactionContext` to clarify the behavior in this scenario. I've also fixed a small flaky test issue in the `ExactlyOnceWorkerSourceTaskTest` suite. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a diff in pull request #13379: KAFKA-14799: Ignore source task requests to abort empty transactions
chia7712 commented on code in PR #13379: URL: https://github.com/apache/kafka/pull/13379#discussion_r1135751362 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -483,15 +483,19 @@ protected boolean shouldCommitTransactionForRecord(SourceRecord record) { if (transactionContext.shouldAbortOn(record)) { log.info("Aborting transaction for record on topic {} as requested by connector", record.topic()); log.trace("Last record in aborted transaction: {}", record); -abortTransaction(); +maybeAbortTransaction(); Review Comment: Thanks for explanation! +1 again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)
mimaison commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1135392026 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -539,11 +596,20 @@ Map describeTopicConfigs(Set topics) } Config targetConfig(Config sourceConfig) { -List entries = sourceConfig.entries().stream() -.filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) -.filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) -.filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) -.collect(Collectors.toList()); +List entries; +if (useIncrementalAlterConfigs == MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) { Review Comment: We should use `equals()` to compare String objects ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -117,6 +122,16 @@ public MirrorSourceConnector() { this.configPropertyFilter = configPropertyFilter; } +// visible for testing the deprecated setting "use.incremental.alter.configs" +// this constructor should be removed when the deprecated setting is removed in Kafka 4.0 +MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy, + String useIncrementalAlterConfigs, ConfigPropertyFilter configPropertyFilter, Admin targetAdmin) { +this.sourceAndTarget = sourceAndTarget; +this.replicationPolicy = replicationPolicy; +this.configPropertyFilter = configPropertyFilter; +this.useIncrementalAlterConfigs = useIncrementalAlterConfigs; +this.targetAdminClient = targetAdmin; + Review Comment: We're missing a closing bracket here! ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ## @@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter { public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude"; public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist"; +public static final String USE_DEFAULTS_FROM = "use.defaults.from";; Review Comment: We can remove the trailing semi colon ## clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java: ## @@ -78,6 +80,19 @@ public static ListTopicsResult listTopicsResult(String topic) { return new ListTopicsResult(future); } +/** + * Helper to create a AlterConfigsResult instance for a given Throwable. + * AlterConfigsResult's constructor is only accessible from within the + * admin package. + */ +public static AlterConfigsResult alterConfigsResult(ConfigResource cr, Throwable t) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +Map> futures = new HashMap<>(); +futures.put(cr, future); Review Comment: We can use singletonMap here: ```suggestion Map> futures = Collections.singletonMap(cr, future); ``` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ## @@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter { public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude"; public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist"; +public static final String USE_DEFAULTS_FROM = "use.defaults.from";; +private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's default to use when syncing topic configurations."; Review Comment: Should it be `defaults` instead of `default`? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -64,6 +67,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; Review Comment: This is already imported just above ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ## @@ -40,11 +44,13 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter { + "unclean\\.leader\\.election\\.enable, " + "min\\.insync\\.replicas"; private Pattern excludePattern = MirrorUtils.compilePatternList(CONFIG_PROPERTIES_EXCLUDE_DEFAULT); +private String useDefaultsFrom = USE_DEFAULTS_FROM_DEFAULT; Review Comment: Do we need this field? I think we could remove it and do ``` "source".equals(config.useDefaultsFrom())
[jira] [Updated] (KAFKA-14672) Producer queue time does not reflect batches expired in the accumulator
[ https://issues.apache.org/jira/browse/KAFKA-14672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14672: -- Description: The producer exposes two metrics for the time a record has spent in the accumulator waiting to be drained: * {{record-queue-time-avg}} * {{record-queue-time-max}} The metric is only updated when a batch is ready to send to a broker. It is also possible for a batch to be expired before it can be sent, but in this case, the metric is not updated. This seems surprising and makes the queue time misleading. The only metric I could find that does reflect batch expirations in the accumulator is the generic {{{}record-error-rate{}}}. It would make sense to let the queue-time metrics record the time spent in the queue regardless of the outcome of the record send attempt. was: The producer exposes two metrics for the time a record has spent in the accumulator waiting to be drained: * `record-queue-time-avg` * `record-queue-time-max` The metric is only updated when a batch is ready to send to a broker. It is also possible for a batch to be expired before it can be sent, but in this case, the metric is not updated. This seems surprising and makes the queue time misleading. The only metric I could find that does reflect batch expirations in the accumulator is the generic `record-error-rate`. It would make sense to let the queue-time metrics record the time spent in the queue regardless of the outcome of the record send attempt. > Producer queue time does not reflect batches expired in the accumulator > --- > > Key: KAFKA-14672 > URL: https://issues.apache.org/jira/browse/KAFKA-14672 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Kirk True >Priority: Major > > The producer exposes two metrics for the time a record has spent in the > accumulator waiting to be drained: > * {{record-queue-time-avg}} > * {{record-queue-time-max}} > The metric is only updated when a batch is ready to send to a broker. It is > also possible for a batch to be expired before it can be sent, but in this > case, the metric is not updated. This seems surprising and makes the queue > time misleading. The only metric I could find that does reflect batch > expirations in the accumulator is the generic {{{}record-error-rate{}}}. It > would make sense to let the queue-time metrics record the time spent in the > queue regardless of the outcome of the record send attempt. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology
ijuma commented on code in PR #13390: URL: https://github.com/apache/kafka/pull/13390#discussion_r1135739560 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -182,7 +182,7 @@ class BrokerLifecycleManager( */ private[server] val eventQueue = new KafkaEventQueue(time, logContext, -threadNamePrefix.getOrElse(""), +threadNamePrefix.getOrElse("") + "BrokerLifecycleManager" + nodeId, Review Comment: Check the replica fetcher threads for an example where we do include source id in the thread name: > val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}-${fetcherPool.name}" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology
ijuma commented on code in PR #13390: URL: https://github.com/apache/kafka/pull/13390#discussion_r1135739560 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -182,7 +182,7 @@ class BrokerLifecycleManager( */ private[server] val eventQueue = new KafkaEventQueue(time, logContext, -threadNamePrefix.getOrElse(""), +threadNamePrefix.getOrElse("") + "BrokerLifecycleManager" + nodeId, Review Comment: Check the replica fetcher threads for an example where we do include source/target ids in the thread name: > val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}-${fetcherPool.name}" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted
[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700263#comment-17700263 ] Sagar Rao commented on KAFKA-14750: --- hmm I tried the approach from above and turns out it's not very full proof. IMO what seems to be happening is that `partitionsFor` when invoked can fetch the metadata from local if it's already there. So, even though the topic is already deleted but by the time in the test case above when the task makes a call, it still fetches from the local cache. > Sink connector fails if a topic matching its topics.regex gets deleted > -- > > Key: KAFKA-14750 > URL: https://issues.apache.org/jira/browse/KAFKA-14750 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.1 >Reporter: Sergei Morozov >Assignee: Sagar Rao >Priority: Major > > Steps to reproduce: > # In {{{}config/connect-standalone.properties{}}}, set: > {code:bash} > plugin.path=libs/connect-file-3.3.1.jar > {code} > # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line > and add this one: > {code:bash} > topics.regex=connect-test-.* > {code} > # Start zookeeper: > {code:bash} > bin/zookeeper-server-start.sh config/zookeeper.properties > {code} > # Start the brokers: > {code:bash} > bin/kafka-server-start.sh config/server.properties > {code} > # Start the file sink connector: > {code:bash} > bin/connect-standalone.sh config/connect-standalone.properties > config/connect-file-sink.properties > {code} > # Create topics for the sink connector to subscribe to: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --create \ > --topic connect-test-$j > done & > done > wait > {code} > # Wait until all the created topics are assigned to the connector. Check the > number of partitions to be > 0 in the output of: > {code:bash} > bin/kafka-consumer-groups.sh \ > --bootstrap-server localhost:9092 \ > --group connect-local-file-sink \ > --describe --members > {code} > # Delete the created topics: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --delete \ > --topic connect-test-$j > echo Deleted topic connect-test-$j. > done & > done > wait > {code} > # Observe the connector fail with the following error: > {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms > expired before the position for partition connect-test-211-0 could be > determined > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology
ijuma commented on code in PR #13390: URL: https://github.com/apache/kafka/pull/13390#discussion_r1135736972 ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -131,11 +131,11 @@ class ControllerServer( if (!maybeChangeStatus(SHUTDOWN, STARTING)) return val startupDeadline = Deadline.fromDelay(time, config.serverMaxStartupTimeMs, TimeUnit.MILLISECONDS) try { + this.logIdent = new LogContext(s"[ControllerServer ${config.nodeId}] ").logPrefix() info("Starting controller") config.dynamicConfig.initialize(zkClientOpt = None) maybeChangeStatus(STARTING, STARTED) - this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix() Review Comment: I think the idea of the `LogContext` stuff was to use key/value pairs so it's easy to enrich the log context. cc @jason for additional thoughts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10228) producer: NETWORK_EXCEPTION is thrown instead of a request timeout
[ https://issues.apache.org/jira/browse/KAFKA-10228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-10228. --- Fix Version/s: 3.5.0 Resolution: Duplicate > producer: NETWORK_EXCEPTION is thrown instead of a request timeout > -- > > Key: KAFKA-10228 > URL: https://issues.apache.org/jira/browse/KAFKA-10228 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.3.1 >Reporter: Christian Becker >Assignee: Kirk True >Priority: Major > Fix For: 3.5.0 > > > We're currently seeing an issue with the java client (producer), when message > producing runs into a timeout. Namely a NETWORK_EXCEPTION is thrown instead > of a timeout exception. > *Situation and relevant code:* > Config > {code:java} > request.timeout.ms: 200 > retries: 3 > acks: all{code} > {code:java} > for (UnpublishedEvent event : unpublishedEvents) { > ListenableFuture> future; > future = kafkaTemplate.send(new ProducerRecord<>(event.getTopic(), > event.getKafkaKey(), event.getPayload())); > futures.add(future.completable()); > } > CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();{code} > We're using the KafkaTemplate from SpringBoot here, but it shouldn't matter, > as it's merely a wrapper. There we put in batches of messages to be sent. > 200ms later, we can see the following in the logs: (not sure about the order, > they've arrived in the same ms, so our logging system might not display them > in the right order) > {code:java} > [Producer clientId=producer-1] Received invalid metadata error in produce > request on partition events-6 due to > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received.. Going to request metadata update now > [Producer clientId=producer-1] Got error produce response with correlation id > 3094 on topic-partition events-6, retrying (2 attempts left). Error: > NETWORK_EXCEPTION {code} > There is also a corresponding error on the broker (within a few ms): > {code:java} > Attempting to send response via channel for which there is no open > connection, connection id XXX (kafka.network.Processor) {code} > This was somewhat unexpected and sent us for a hunt across the infrastructure > for possible connection issues, but we've found none. > Side note: In some cases the retries worked and the messages were > successfully produced. > Only after many hours of heavy debugging, we've noticed, that the error might > be related to the low timeout setting. We've removed that setting now, as it > was a remnant from the past and no longer valid for our use-case. However in > order to avoid other people having that issue again and to simplify future > debugging, some form of timeout exception should be thrown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13379: KAFKA-14799: Ignore source task requests to abort empty transactions
C0urante commented on code in PR #13379: URL: https://github.com/apache/kafka/pull/13379#discussion_r1135720026 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -483,15 +483,19 @@ protected boolean shouldCommitTransactionForRecord(SourceRecord record) { if (transactionContext.shouldAbortOn(record)) { log.info("Aborting transaction for record on topic {} as requested by connector", record.topic()); log.trace("Last record in aborted transaction: {}", record); -abortTransaction(); +maybeAbortTransaction(); Review Comment: I don't think so; the only time we invoke `Producer::beginTransaction` is [here](https://github.com/apache/kafka/blob/ccfc389a638a126c3769bdd72725bae532ca4d01/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L248-L249), after which we immediately set `transactionOpen` to `true`, and the only time we set `transactionOpen` to `false` is after either aborting a transaction (which takes place in this method), or committing the transaction (which takes place [here](https://github.com/apache/kafka/blob/ccfc389a638a126c3769bdd72725bae532ca4d01/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L311). In either case, if we fail the operation (aborting/committing the transaction), the task fails, and we [skip the end-of-life offset commit](https://github.com/apache/kafka/blob/ccfc389a638a126c3769bdd72725bae532ca4d01/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSour ceTask.java#L219-L222). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #13378: KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets Commit
Hangleton commented on PR #13378: URL: https://github.com/apache/kafka/pull/13378#issuecomment-1468290773 Thanks David @mumrah for the follow-up! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13372: MINOR: Improved error handling in ZK migration
mumrah commented on code in PR #13372: URL: https://github.com/apache/kafka/pull/13372#discussion_r1135709978 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -132,26 +132,45 @@ private boolean isControllerQuorumReadyForMigration() { return true; } +private boolean imageDoesNotContainAllBrokers(MetadataImage image, Set brokerIds) { +for (BrokerRegistration broker : image.cluster().brokers().values()) { +if (broker.isMigratingZkBroker()) { +brokerIds.remove(broker.id()); +} +} +return !brokerIds.isEmpty(); +} + private boolean areZkBrokersReadyForMigration() { if (image == MetadataImage.EMPTY) { // TODO maybe add WAIT_FOR_INITIAL_METADATA_PUBLISH state to avoid this kind of check? log.info("Waiting for initial metadata publish before checking if Zk brokers are registered."); return false; } -Set zkRegisteredZkBrokers = zkMigrationClient.readBrokerIdsFromTopicAssignments(); -for (BrokerRegistration broker : image.cluster().brokers().values()) { -if (broker.isMigratingZkBroker()) { -zkRegisteredZkBrokers.remove(broker.id()); -} + +// First check the brokers registered in ZK +Set zkBrokerRegistrations = new HashSet<>(zkMigrationClient.readBrokerIds()); Review Comment: Ah, yea this is needed because the Set returned by the client was an immutable Scala thing originally. I'll change the client to return a mutable set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException
C0urante commented on code in PR #11565: URL: https://github.com/apache/kafka/pull/11565#discussion_r1135694291 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public abstract class KafkaTopicBasedBackingStore { Review Comment: Is inheritance necessary here? It seems like we might make this a standalone class that can be composed into the various `Kafka*BackingStore` classes (and then possibly others) instead of only making it available through subclassing. (Not a blocker) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13378: KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets Commit
mumrah commented on PR #13378: URL: https://github.com/apache/kafka/pull/13378#issuecomment-1468256274 @Hangleton, yes we should expect some improvement to ZkMigrationIntegrationTest with that PR. There are some cases where we lose the ZK session during the tests which causes a spurious failure. Yesterday, we identified another source of flakiness in that test which turns out to be a real bug https://issues.apache.org/jira/browse/KAFKA-14805 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology
mumrah commented on code in PR #13390: URL: https://github.com/apache/kafka/pull/13390#discussion_r1135643036 ## core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala: ## @@ -238,7 +238,7 @@ class BrokerToControllerChannelManagerImpl( } val threadName = threadNamePrefix match { case None => s"BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName" - case Some(name) => s"$name:BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName" + case Some(name) => s"${name}ToControllerChannelManager broker=${config.brokerId} name=$channelName" Review Comment: What is `name` here and where is it supplied? ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -182,7 +182,7 @@ class BrokerLifecycleManager( */ private[server] val eventQueue = new KafkaEventQueue(time, logContext, -threadNamePrefix.getOrElse(""), +threadNamePrefix.getOrElse("") + "BrokerLifecycleManager" + nodeId, Review Comment: I fired up this branch and yea `BrokerLifecycleManager1EventHandler` is not pretty :) I'm not sure we need node ID in the thread name since we will presumably know which node a thread dump came from. It might actually be confusing since the convention we have is `{thread name}-{thread number in pool}` like "metrics-meter-tick-thread-1", "metrics-meter-tick-thread-2", "data-plane-kafka-request-handler-0", "data-plane-kafka-request-handler-1", etc. What about something like `broker-lifecycle-manager-event-handler` or `BrokerLifecycleManager-EventHandler` I also noticed somewhere we are not prefixing the event queue thread name, there is just an `EventHandler` thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag
C0urante commented on code in PR #13367: URL: https://github.com/apache/kafka/pull/13367#discussion_r1135661761 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -315,18 +314,16 @@ static class PartitionState { // true if we should emit an offset sync boolean update(long upstreamOffset, long downstreamOffset) { -// This value is what OffsetSyncStore::translateOffsets would compute for this offset given the last sync. -// Because this method is called at most once for each upstream offset, simplify upstreamStep to 1. +// Emit an offset sync if any of the following conditions are true +boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == -1L; +// the OffsetSync::translateDownstream method will translate this offset 1 past the last sync, so add 1. // TODO: share common implementation to enforce this relationship -long downstreamTargetOffset = lastSyncDownstreamOffset + 1; -if (lastSyncDownstreamOffset == -1L -|| downstreamOffset - downstreamTargetOffset >= maxOffsetLag -|| upstreamOffset - previousUpstreamOffset != 1L Review Comment: Why are we removing this condition? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -315,18 +314,16 @@ static class PartitionState { // true if we should emit an offset sync boolean update(long upstreamOffset, long downstreamOffset) { -// This value is what OffsetSyncStore::translateOffsets would compute for this offset given the last sync. -// Because this method is called at most once for each upstream offset, simplify upstreamStep to 1. +// Emit an offset sync if any of the following conditions are true +boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == -1L; +// the OffsetSync::translateDownstream method will translate this offset 1 past the last sync, so add 1. // TODO: share common implementation to enforce this relationship -long downstreamTargetOffset = lastSyncDownstreamOffset + 1; -if (lastSyncDownstreamOffset == -1L -|| downstreamOffset - downstreamTargetOffset >= maxOffsetLag -|| upstreamOffset - previousUpstreamOffset != 1L Review Comment: Why are we removing this condition? Is that necessary for this fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology
ijuma commented on code in PR #13390: URL: https://github.com/apache/kafka/pull/13390#discussion_r1135641916 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -182,7 +182,7 @@ class BrokerLifecycleManager( */ private[server] val eventQueue = new KafkaEventQueue(time, logContext, -threadNamePrefix.getOrElse(""), +threadNamePrefix.getOrElse("") + "BrokerLifecycleManager" + nodeId, Review Comment: We typically separate the node id via a dash. Have you tried to be consistent with what we do outside of kraft? That would help when debugging the system. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology
ijuma commented on code in PR #13390: URL: https://github.com/apache/kafka/pull/13390#discussion_r1135641916 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -182,7 +182,7 @@ class BrokerLifecycleManager( */ private[server] val eventQueue = new KafkaEventQueue(time, logContext, -threadNamePrefix.getOrElse(""), +threadNamePrefix.getOrElse("") + "BrokerLifecycleManager" + nodeId, Review Comment: We typically separate the id via a dash. Have you tried to be consistent with what we do outside of kraft? That would help when debugging the system. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException
C0urante commented on code in PR #11565: URL: https://github.com/apache/kafka/pull/11565#discussion_r1135639682 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -70,6 +74,9 @@ public class TopicAdmin implements AutoCloseable { public static final TopicCreationResponse EMPTY_CREATION = new TopicCreationResponse(Collections.emptySet(), Collections.emptySet()); +private static final List> CAUSES_TO_RETRY_TOPIC_CREATION = Arrays.asList( +InvalidReplicationFactorException.class, +TimeoutException.class); Review Comment: IMO this would be a bit too drastic; the fallout of a change like that scales linearly with the retry duration that the user has configured and may delay surfacing valid issues for a bit too long. It could be especially frustrating in quickstart scenarios where someone defines a replication factor that exceeds the number of local brokers and has to wait an entire minute to find out what the problem is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14803) topic deletion bug
[ https://issues.apache.org/jira/browse/KAFKA-14803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700217#comment-17700217 ] Chris Egerton commented on KAFKA-14803: --- [~bwei9awf] It looks like this is a duplicate of KAFKA-14802 so I've closed it; if there are any updates you'd like to make for this issue, would you mind making them on that ticket? Thanks! > topic deletion bug > -- > > Key: KAFKA-14803 > URL: https://issues.apache.org/jira/browse/KAFKA-14803 > Project: Kafka > Issue Type: Bug > Components: controller, replication >Affects Versions: 3.3.2 > Environment: AWS m5.xlarge EC2 instance >Reporter: Behavox >Priority: Major > Attachments: server.properties > > > topic deletion doesn't work as expected when attempting to delete topic(s), > after successful deletion topic is recreated in a multi-controller > environment with 3 controllers and ReplicationFactor: 2 > How to reproduce - attempt to delete topic. Topic is removed successfully and > recreated right after removal. Example below shows a single topic named > example-topic. We have a total count of 17000 topics in the affected cluster. > > Our config is attached. > Run 1 > [2023-03-10 16:16:45,625] INFO [Controller 1] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,722] INFO [Controller 1] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:16:45,730] INFO [Controller 2] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,851] INFO [Controller 2] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:16:45,837] INFO [Controller 3] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,833] INFO [Controller 3] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > Run 2 > [2023-03-10 16:20:22,469] INFO [Controller 1] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:19,711] INFO [Controller 1] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:20:22,674] INFO [Controller 2] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:20,022] INFO [Controller 2] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:20:22,674] INFO [Controller 3] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:20,020] INFO [Controller 3] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14802) topic deletion bug
[ https://issues.apache.org/jira/browse/KAFKA-14802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14802: -- Environment: AWS m5.xlarge EC2 instance > topic deletion bug > -- > > Key: KAFKA-14802 > URL: https://issues.apache.org/jira/browse/KAFKA-14802 > Project: Kafka > Issue Type: Bug > Components: controller, replication >Affects Versions: 3.3.2 > Environment: AWS m5.xlarge EC2 instance >Reporter: Behavox >Priority: Major > Attachments: server.properties > > > topic deletion doesn't work as expected when attempting to delete topic(s), > after successful deletion topic is recreated in a multi-controller > environment with 3 controllers and ReplicationFactor: 2 > How to reproduce - attempt to delete topic. Topic is removed successfully and > recreated right after removal. Example below shows a single topic named > example-topic. We have a total count of 17000 topics in the affected cluster. > > Our config is attached. > Run 1 > [2023-03-10 16:16:45,625] INFO [Controller 1] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,722] INFO [Controller 1] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:16:45,730] INFO [Controller 2] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,851] INFO [Controller 2] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:16:45,837] INFO [Controller 3] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,833] INFO [Controller 3] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > Run 2 > [2023-03-10 16:20:22,469] INFO [Controller 1] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:19,711] INFO [Controller 1] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:20:22,674] INFO [Controller 2] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:20,022] INFO [Controller 2] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:20:22,674] INFO [Controller 3] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:20,020] INFO [Controller 3] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14803) topic deletion bug
[ https://issues.apache.org/jira/browse/KAFKA-14803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-14803. --- Resolution: Duplicate > topic deletion bug > -- > > Key: KAFKA-14803 > URL: https://issues.apache.org/jira/browse/KAFKA-14803 > Project: Kafka > Issue Type: Bug > Components: controller, replication >Affects Versions: 3.3.2 > Environment: AWS m5.xlarge EC2 instance >Reporter: Behavox >Priority: Major > Attachments: server.properties > > > topic deletion doesn't work as expected when attempting to delete topic(s), > after successful deletion topic is recreated in a multi-controller > environment with 3 controllers and ReplicationFactor: 2 > How to reproduce - attempt to delete topic. Topic is removed successfully and > recreated right after removal. Example below shows a single topic named > example-topic. We have a total count of 17000 topics in the affected cluster. > > Our config is attached. > Run 1 > [2023-03-10 16:16:45,625] INFO [Controller 1] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,722] INFO [Controller 1] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:16:45,730] INFO [Controller 2] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,851] INFO [Controller 2] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:16:45,837] INFO [Controller 3] Removed topic example-topic > with ID fh_aQcc3Sf2yVBTMrltBlQ. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:19:04,833] INFO [Controller 3] Created topic example-topic > with topic ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > Run 2 > [2023-03-10 16:20:22,469] INFO [Controller 1] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:19,711] INFO [Controller 1] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:20:22,674] INFO [Controller 2] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:20,022] INFO [Controller 2] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:20:22,674] INFO [Controller 3] Removed topic example-topic > with ID a-7OZG_XQhiCatOBft-9-g. > (org.apache.kafka.controller.ReplicationControlManager) > [2023-03-10 16:22:20,020] INFO [Controller 3] Created topic example-topic > with topic ID xxlJlIe_SvqQHtfgbX2eLA. > (org.apache.kafka.controller.ReplicationControlManager) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14808) Partition becomes leaderless when new partition reassignment removes the adding replica
[ https://issues.apache.org/jira/browse/KAFKA-14808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shenglong Zhang reassigned KAFKA-14808: --- Assignee: Shenglong Zhang > Partition becomes leaderless when new partition reassignment removes the > adding replica > --- > > Key: KAFKA-14808 > URL: https://issues.apache.org/jira/browse/KAFKA-14808 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.4.0 >Reporter: Shenglong Zhang >Assignee: Shenglong Zhang >Priority: Major > > If there is ongoing partition reassignment and any adding replica has been > elected as leader (due to preferred leader election or other reason), the > partition will immediately becomes leaderless on receiving a new partition > reassignment which removes that adding replica. > 1) partition-0 has replicas [0, 2] > 2) partition-0 is being reassigned to [1, 0, 2], and somehow this > reassignment is stuck (e.g. broker 2 is down). > 3) Preferred leader election is triggered, and broker 1 is elected as leader. > 4) When submitting a new partition reassignment to [2, 0, 3], which remove > broker 1 and add broker 3, partition will become leaderless. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14808) Partition becomes leaderless when new partition reassignment removes the adding replica
Shenglong Zhang created KAFKA-14808: --- Summary: Partition becomes leaderless when new partition reassignment removes the adding replica Key: KAFKA-14808 URL: https://issues.apache.org/jira/browse/KAFKA-14808 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 3.4.0 Reporter: Shenglong Zhang If there is ongoing partition reassignment and any adding replica has been elected as leader (due to preferred leader election or other reason), the partition will immediately becomes leaderless on receiving a new partition reassignment which removes that adding replica. 1) partition-0 has replicas [0, 2] 2) partition-0 is being reassigned to [1, 0, 2], and somehow this reassignment is stuck (e.g. broker 2 is down). 3) Preferred leader election is triggered, and broker 1 is elected as leader. 4) When submitting a new partition reassignment to [2, 0, 3], which remove broker 1 and add broker 3, partition will become leaderless. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException
viktorsomogyi commented on code in PR #11565: URL: https://github.com/apache/kafka/pull/11565#discussion_r1135376859 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -328,6 +335,48 @@ public Set createTopics(NewTopic... topics) { return createOrFindTopics(topics).createdTopics(); } +/** + * Implements a retry logic around creating topic(s) in case it'd fail due to InvalidReplicationFactorException Review Comment: nit: add "or TimeoutException" ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -70,6 +74,9 @@ public class TopicAdmin implements AutoCloseable { public static final TopicCreationResponse EMPTY_CREATION = new TopicCreationResponse(Collections.emptySet(), Collections.emptySet()); +private static final List> CAUSES_TO_RETRY_TOPIC_CREATION = Arrays.asList( +InvalidReplicationFactorException.class, +TimeoutException.class); Review Comment: Since TimeoutException is a RetriableException I was wondering whether we could refactor InvalidReplicationFactorException because in some circumstances it can be retriable and I think even if there won't be more brokers, it doesn't hurt too much to retry a few times with backoff. What do you think @mimaison, would this be considered an API change? ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -328,6 +335,48 @@ public Set createTopics(NewTopic... topics) { return createOrFindTopics(topics).createdTopics(); } +/** + * Implements a retry logic around creating topic(s) in case it'd fail due to InvalidReplicationFactorException + * + * @param topicDescription + * @param timeoutMs + * @param backOffMs + * @param time + * @return the same as {@link TopicAdmin#createTopics(NewTopic...)} Review Comment: nit: please fill out parameters and return value if you add javadocs. ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -328,6 +335,48 @@ public Set createTopics(NewTopic... topics) { return createOrFindTopics(topics).createdTopics(); } +/** + * Implements a retry logic around creating topic(s) in case it'd fail due to InvalidReplicationFactorException + * + * @param topicDescription + * @param timeoutMs + * @param backOffMs + * @param time + * @return the same as {@link TopicAdmin#createTopics(NewTopic...)} + */ +public Set createTopicsWithRetry(NewTopic topicDescription, long timeoutMs, long backOffMs, Time time) { Review Comment: I think it would be a bit more robust to add an overload to {{org.apache.kafka.connect.util.RetryUtil#retryUntilTimeout}} that specifies a set of exceptions or a condition to retry on and use that here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14804) Connect docs fail to build with Gradle Swagger plugin 2.2.8
[ https://issues.apache.org/jira/browse/KAFKA-14804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-14804. Fix Version/s: 3.5.0 Resolution: Fixed > Connect docs fail to build with Gradle Swagger plugin 2.2.8 > --- > > Key: KAFKA-14804 > URL: https://issues.apache.org/jira/browse/KAFKA-14804 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Assignee: Mickael Maison >Priority: Minor > Fix For: 3.5.0 > > > There is an incompatibility somewhere between versions 2.2.0 and 2.2.8 that > cause the following error when building the connect docs: > {code} > Caused by: org.gradle.api.GradleException: > io.swagger.v3.jaxrs2.integration.SwaggerLoader.setOpenAPI31(java.lang.Boolean) > at > io.swagger.v3.plugins.gradle.tasks.ResolveTask.resolve(ResolveTask.java:458) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at org.gradle.internal.reflect.JavaMethod.invoke(JavaMethod.java:125) > at > org.gradle.api.internal.project.taskfactory.StandardTaskAction.doExecute(StandardTaskAction.java:58) > at > org.gradle.api.internal.project.taskfactory.StandardTaskAction.execute(StandardTaskAction.java:51) > at > org.gradle.api.internal.project.taskfactory.StandardTaskAction.execute(StandardTaskAction.java:29) > at > org.gradle.api.internal.tasks.execution.TaskExecution$3.run(TaskExecution.java:242) > at > org.gradle.internal.operations.DefaultBuildOperationRunner$1.execute(DefaultBuildOperationRunner.java:29) > at > org.gradle.internal.operations.DefaultBuildOperationRunner$1.execute(DefaultBuildOperationRunner.java:26) > at > org.gradle.internal.operations.DefaultBuildOperationRunner$2.execute(DefaultBuildOperationRunner.java:66) > at > org.gradle.internal.operations.DefaultBuildOperationRunner$2.execute(DefaultBuildOperationRunner.java:59) > at > org.gradle.internal.operations.DefaultBuildOperationRunner.execute(DefaultBuildOperationRunner.java:157) > at > org.gradle.internal.operations.DefaultBuildOperationRunner.execute(DefaultBuildOperationRunner.java:59) > at > org.gradle.internal.operations.DefaultBuildOperationRunner.run(DefaultBuildOperationRunner.java:47) > at > org.gradle.internal.operations.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:68) > at > org.gradle.api.internal.tasks.execution.TaskExecution.executeAction(TaskExecution.java:227) > at > org.gradle.api.internal.tasks.execution.TaskExecution.executeActions(TaskExecution.java:210) > at > org.gradle.api.internal.tasks.execution.TaskExecution.executeWithPreviousOutputFiles(TaskExecution.java:193) > at > org.gradle.api.internal.tasks.execution.TaskExecution.execute(TaskExecution.java:166) > at > org.gradle.internal.execution.steps.ExecuteStep.executeInternal(ExecuteStep.java:93) > at > org.gradle.internal.execution.steps.ExecuteStep.access$000(ExecuteStep.java:44) > at > org.gradle.internal.execution.steps.ExecuteStep$1.call(ExecuteStep.java:57) > at > org.gradle.internal.execution.steps.ExecuteStep$1.call(ExecuteStep.java:54) > at > org.gradle.internal.operations.DefaultBuildOperationRunner$CallableBuildOperationWorker.execute(DefaultBuildOperationRunner.java:204) > at > org.gradle.internal.operations.DefaultBuildOperationRunner$CallableBuildOperationWorker.execute(DefaultBuildOperationRunner.java:199) > at > org.gradle.internal.operations.DefaultBuildOperationRunner$2.execute(DefaultBuildOperationRunner.java:66) > at > org.gradle.internal.operations.DefaultBuildOperationRunner$2.execute(DefaultBuildOperationRunner.java:59) > at > org.gradle.internal.operations.DefaultBuildOperationRunner.execute(DefaultBuildOperationRunner.java:157) > at > org.gradle.internal.operations.DefaultBuildOperationRunner.execute(DefaultBuildOperationRunner.java:59) > at > org.gradle.internal.operations.DefaultBuildOperationRunner.call(DefaultBuildOperationRunner.java:53) > at > org.gradle.internal.operations.DefaultBuildOperationExecutor.call(DefaultBuildOperationExecutor.java:73) > at > org.gradle.internal.execution.steps.ExecuteStep.execute(ExecuteStep.java:54) > at > org.gradle.internal.execution.steps.ExecuteStep.execute(ExecuteStep.java:44) > at >
[GitHub] [kafka] mimaison merged pull request #13388: MINOR: Align swagger dependencies with gradle plugin
mimaison merged PR #13388: URL: https://github.com/apache/kafka/pull/13388 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13388: MINOR: Align swagger dependencies with gradle plugin
mimaison commented on PR #13388: URL: https://github.com/apache/kafka/pull/13388#issuecomment-1467878813 Build failures are not related, merging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14807) MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the pause of replication of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhaoli updated KAFKA-14807: --- Description: We use MirrorMaker2 to replicate messages and consumergroup offsets from kafka cluster `source` to cluster `target`. In order to reduce the load on the source cluster, we add this configuration to mm2 to avoid replicate the whole history messages: {code:java} source.consumer.auto.offset.reset=latest {code} After that, we found part of the consumergroup offsets have stopped replicating. The common characteristic of these consumergroups is their EMPTY status,which means they have no active members at that monent. All the active consumergroups‘ offset replication work as normal. After researching the source code,we found this is because the configuration above also affect the consumption of topic `mm2-offset-syncs`, therefore the map `offsetSyncs` dosen't hold the whole topicPartitions: {code:java} private final Map offsetSyncs = new HashMap<>(); {code} And the lost topicPartitions lead to the pause of replication of the EMPTY consumer groups, which is not expected. {code:java} OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) { Optional offsetSync = latestOffsetSync(sourceTopicPartition); if (offsetSync.isPresent()) { if (offsetSync.get().upstreamOffset() > upstreamOffset) { // Offset is too far in the past to translate accurately return OptionalLong.of(-1L); } long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset(); return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep); } else { return OptionalLong.empty(); } }{code} was: We use MirrorMaker2 to replicate messages and consumergroup offsets from kafka cluster `source` to cluster `target`. In order to reduce the load on the source cluster, we add this configuration to mm2 to avoid replicate the whole history messages: {code:java} source.consumer.auto.offset.reset=latest {code} After that, we found part of the consumergroup offsets have stopped replicating. The common characteristic of these consumergroups is their EMPTY status,which means they have no active members at that monent. All the active consumergroups‘ offset replication work as normal. After researching the source code,we found this is because the configuration above also affect the consumption of topic `mm2-offset-syncs`, therefore the map `offsetSyncs` dosen't hold the whole topicPartitions: {code:java} private final Map offsetSyncs = new HashMap<>(); {code} And the lost topicPartitions lead to the pause of replication of the EMPTY consumer groups, which is not expected. > MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the > pause of replication of consumer groups > --- > > Key: KAFKA-14807 > URL: https://issues.apache.org/jira/browse/KAFKA-14807 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.4.0, 3.3.1, 3.3.2 > Environment: centos7 >Reporter: Zhaoli >Priority: Major > > We use MirrorMaker2 to replicate messages and consumergroup offsets from > kafka cluster `source` to cluster `target`. > In order to reduce the load on the source cluster, we add this configuration > to mm2 to avoid replicate the whole history messages: > {code:java} > source.consumer.auto.offset.reset=latest {code} > After that, we found part of the consumergroup offsets have stopped > replicating. > The common characteristic of these consumergroups is their EMPTY > status,which means they have no active members at that monent. All the active > consumergroups‘ offset replication work as normal. > After researching the source code,we found this is because the configuration > above also affect the consumption of topic `mm2-offset-syncs`, therefore the > map `offsetSyncs` dosen't hold the whole topicPartitions: > {code:java} > private final Map offsetSyncs = new HashMap<>(); > {code} > And the lost topicPartitions lead to the pause of replication of the EMPTY > consumer groups, which is not expected. > {code:java} > OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long > upstreamOffset) { > Optional offsetSync = latestOffsetSync(sourceTopicPartition); > if (offsetSync.isPresent()) { > if (offsetSync.get().upstreamOffset() > upstreamOffset) { > // Offset is too far in the past to translate accurately > return OptionalLong.of(-1L); > } > long upstreamStep = upstreamOffset - > offsetSync.get().upstreamOffset(); > return OptionalLong.of(offsetSync.get().downstreamOffset() + >
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
divijvaidya commented on code in PR #13284: URL: https://github.com/apache/kafka/pull/13284#discussion_r1135316350 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java: ## @@ -62,8 +60,10 @@ public void setup() { @AfterEach public void teardown() throws Throwable { AtomicReference shutdownFailure = new AtomicReference<>(); -mirrorMakers.forEach((name, mirrorMaker) -> -Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure) +mirrorMakers.forEach((name, mirrorMaker) -> { +Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure); +mirrorMaker.awaitStop(); +} Review Comment: done in latest commit. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java: ## @@ -62,8 +60,10 @@ public void setup() { @AfterEach public void teardown() throws Throwable { AtomicReference shutdownFailure = new AtomicReference<>(); -mirrorMakers.forEach((name, mirrorMaker) -> -Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure) +mirrorMakers.forEach((name, mirrorMaker) -> { +Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure); +mirrorMaker.awaitStop(); +} Review Comment: good idea. done in latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org