[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-14 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1136454999


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -437,29 +456,24 @@ public void wakeup() {
  */
 @Override
 public void commitSync(final Duration timeout) {
-final CommitApplicationEvent commitEvent = new 
CommitApplicationEvent(subscriptions.allConsumed());
-eventHandler.add(commitEvent);
-
-final CompletableFuture commitFuture = commitEvent.future();
-try {
-commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-} catch (final TimeoutException e) {
-throw new org.apache.kafka.common.errors.TimeoutException(
- "timeout");
-} catch (final Exception e) {
-// handle exception here
-throw new RuntimeException(e);
-}
+commitSync(subscriptions.allConsumed(), timeout);
 }
 
 @Override
 public void commitSync(Map offsets) {
-throw new KafkaException("method not implemented");
+commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
 }
 
 @Override
 public void commitSync(Map offsets, 
Duration timeout) {
-throw new KafkaException("method not implemented");
+CompletableFuture commitFuture = commit(offsets);
+try {
+commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+} catch (final TimeoutException e) {

Review Comment:
   Ditto above: InterruptedException -> kafka interruptException. 
TimeoutException -> kafka timeoutException. ExecutionException -> kafkaException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-14 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1136454609


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) {
 this.timer.update(currentTimeMs);
 }
 }
+
+private class FetchCommittedOffsetResponseHandler {

Review Comment:
   It's 80% copy paste, a thing changed here:
   `retry(currentTimeMs);` is invoked upon RetriableExceptions. In the original 
code, the exception was raised and then retried until timeout.  Here is the 
reference: 
https://github.com/apache/kafka/blob/d23ce20bdfbe5a9598523961cb7cf747ce4f52ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1016
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-14 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1136449438


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -306,13 +310,28 @@ public OffsetAndMetadata committed(TopicPartition 
partition, Duration timeout) {
 }
 
 @Override
-public Map 
committed(Set partitions) {
-throw new KafkaException("method not implemented");
+public Map committed(final 
Set partitions) {
+return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
 }
 
 @Override
-public Map 
committed(Set partitions, Duration timeout) {
-throw new KafkaException("method not implemented");
+public Map committed(final 
Set partitions,
+final Duration 
timeout) {
+maybeThrowInvalidGroupIdException();
+final OffsetFetchApplicationEvent event = new 
OffsetFetchApplicationEvent(partitions);
+eventHandler.add(event);
+try {
+return event.complete(Duration.ofMillis(100));
+} catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+throw new KafkaException(e);

Review Comment:
   InterruptedException, TimeoutException -> We can map them to the existing 
kafka exception
   ExecutionException - KafkaException
   There are a few blocking operations like committed(), I think we need to 
support wakeup here. I don't know what's the suggestions here, but I'm thinking 
of running a timer, and check wakeup in that loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14206) Upgrade zookeeper to 3.7.1 to address security vulnerabilities

2023-03-14 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-14206:
-

Assignee: (was: Luke Chen)

> Upgrade zookeeper to 3.7.1 to address security vulnerabilities
> --
>
> Key: KAFKA-14206
> URL: https://issues.apache.org/jira/browse/KAFKA-14206
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Affects Versions: 3.2.1
>Reporter: Valeriy Kassenbayev
>Priority: Blocker
>
> Kafka 3.2.1 is using ZooKeeper, which is affected by 
> [CVE-2021-37136|https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584064] and 
> [CVE-2021-37137:|https://www.cve.org/CVERecord?id=CVE-2021-37137]
> {code:java}
>   ✗ Denial of Service (DoS) [High 
> Severity][https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584063] in 
> io.netty:netty-codec@4.1.63.Final
>     introduced by org.apache.kafka:kafka_2.13@3.2.1 > 
> org.apache.zookeeper:zookeeper@3.6.3 > io.netty:netty-handler@4.1.63.Final > 
> io.netty:netty-codec@4.1.63.Final
>   This issue was fixed in versions: 4.1.68.Final
>   ✗ Denial of Service (DoS) [High 
> Severity][https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584064] in 
> io.netty:netty-codec@4.1.63.Final
>     introduced by org.apache.kafka:kafka_2.13@3.2.1 > 
> org.apache.zookeeper:zookeeper@3.6.3 > io.netty:netty-handler@4.1.63.Final > 
> io.netty:netty-codec@4.1.63.Final
>   This issue was fixed in versions: 4.1.68.Final {code}
> The issues were fixed in the next versions of ZooKeeper (starting from 
> 3.6.4). ZooKeeper 3.7.1 is the next stable 
> [release|https://zookeeper.apache.org/releases.html] at the moment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-14 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1136434874


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 }
 }
 
+static class UnsentOffsetFetchRequestState extends RequestState {
+public final Set requestedPartitions;
+public final GroupState.Generation requestedGeneration;
+public CompletableFuture> 
future;
+
+public UnsentOffsetFetchRequestState(final Set 
partitions,
+ final GroupState.Generation 
generation,
+ final 
CompletableFuture> future,
+ final long retryBackoffMs) {
+super(retryBackoffMs);
+this.requestedPartitions = partitions;
+this.requestedGeneration = generation;
+this.future = future;
+}
+
+public boolean sameRequest(final UnsentOffsetFetchRequestState 
request) {
+return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+}
+}
+
+/**
+ * This is used to support the committed() API. Here we use a Java 
Collections, {@code unsentRequests}, to
+ * track
+ * the OffsetFetchRequests that haven't been sent, to prevent sending the 
same requests in the same batch.
+ *
+ * If the request is new. It will be enqueued to the {@code 
unsentRequest}, and will be sent upon the next
+ * poll.
+ *
+ * If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+ * completion of the existing one.
+ *
+ * TODO: There is an optimization to present duplication to the sent but 
incompleted requests. I'm not sure if we
+ * need that.
+ */
+class UnsentOffsetFetchRequests {

Review Comment:
   Hi! That was my original attempt :) , but I abandon it because I was 
wondering if we actually need this optimization?
   
   I think if we call committed() several times consecutively within a single 
event loop, it makes sense to coalesce them into a single request. Once we send 
out the request, I wonder how often do we invoke another committed() before the 
request gets sent out.  Is it a use case from the stream side?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14810) Refactor FileRawSansphotWriter to not reference ReplicateLog

2023-03-14 Thread Jira
José Armando García Sancio created KAFKA-14810:
--

 Summary: Refactor FileRawSansphotWriter to not reference 
ReplicateLog
 Key: KAFKA-14810
 URL: https://issues.apache.org/jira/browse/KAFKA-14810
 Project: Kafka
  Issue Type: Task
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


The current implementation of FileRawSnapshotWriter uses an 
Optional to propagate when a new snapshot has been created by 
the state machine. This abstraction is too strict when writing tests and should 
be changed to something like Consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KRAFT-14765 and KRAFT-14776: Support for SCRAM at bootstrap with integration tests

2023-03-14 Thread via GitHub


pprovenzano commented on code in PR #13374:
URL: https://github.com/apache/kafka/pull/13374#discussion_r1136329445


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -221,4 +224,78 @@ Found problem:
 
 assertThrows(classOf[IllegalArgumentException], () => 
parseMetadataVersion("--release-version", "0.0"))
   }
+
+  @Test
+  def testAddScram():Unit = {
+def parseAddScram(strings: String*): 
Option[ArrayBuffer[UserScramCredentialRecord]] = {
+  var args = mutable.Seq("format", "-c", "config.props", "-t", 
"XcZZOzUqS4yHOjhMQB6JLQ")
+  args ++= strings
+  val namespace = StorageTool.parseArguments(args.toArray)
+  StorageTool.getUserScramCredentialRecords(namespace)
+}
+
+var scramRecords = parseAddScram()
+assertEquals(None, scramRecords)
+
+// Validate we can add multiple SCRAM creds.
+scramRecords = parseAddScram("-S",
+
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]",
+"-S",
+
"SCRAM-SHA-256=[name=george,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]")
+
+assertEquals(2, scramRecords.get.size)
+
+// Require name subfield.
+try assertEquals(1, parseAddScram("-S", 
+  
"SCRAM-SHA-256=[salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]"))
 catch {
+  case e: TerseFailure => assertEquals(s"You must supply 'name' to 
add-scram", e.getMessage)
+}
+
+// Require password xor saltedpassword
+try assertEquals(1, parseAddScram("-S", 
+  
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]"))
+catch {
+  case e: TerseFailure => assertEquals(s"You must only supply one of 
'password' or 'saltedpassword' to add-scram", e.getMessage)
+}
+
+try assertEquals(1, parseAddScram("-S", 
+  
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",iterations=8192]"))
+catch {
+  case e: TerseFailure => assertEquals(s"You must supply one of 'password' 
or 'saltedpassword' to add-scram", e.getMessage)
+}
+
+// Validate salt is required with saltedpassword
+try assertEquals(1, parseAddScram("-S", 
+  
"SCRAM-SHA-256=[name=alice,saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]"))
+catch {
+  case e: TerseFailure => assertEquals(s"You must supply 'salt' with 
'saltedpassword' to add-scram", e.getMessage)
+}
+
+// Validate salt is optional with password
+assertEquals(1, parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,password=alice,iterations=4096]").get.size)
+
+// Require 4096 <= iterations <= 16384
+try assertEquals(1, parseAddScram("-S", 
+  
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=16385]"))
+catch {
+  case e: TerseFailure => assertEquals(s"The 'iterations' value must be <= 
16384 for add-scram", e.getMessage)
+}
+
+assertEquals(1, parseAddScram("-S",
+  
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=16384]")
+  .get.size)
+
+try assertEquals(1, parseAddScram("-S", 
+  
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=4095]"))
+catch {
+  case e: TerseFailure => assertEquals(s"The 'iterations' value must be >= 
4096 for add-scram", e.getMessage)
+}
+
+assertEquals(1, parseAddScram("-S",
+  
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",password=alice,iterations=4096]")
+  .get.size)
+
+// Validate iterations is optional
+assertEquals(1, parseAddScram("-S", 
"SCRAM-SHA-256=[name=alice,password=alice]") .get.size)

Review Comment:
   There is a check for metadata version in ScramImage and a test to validate 
that check. I have manually tested that the formatter produces an error if the 
IBP version is set to an earlier version and SCRAM records are added. Testing 
this in a unit test is a lot of work and not worth it as the cluster will just 
fail to come up if they bypass it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KRAFT-14765 and KRAFT-14776: Support for SCRAM at bootstrap with integration tests

2023-03-14 Thread via GitHub


pprovenzano commented on code in PR #13374:
URL: https://github.com/apache/kafka/pull/13374#discussion_r1136323073


##
core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala:
##
@@ -46,8 +47,8 @@ class ScramServerStartupTest extends IntegrationTestHarness 
with SaslSetup {
   override protected val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
   override protected val clientSaslProperties = 
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
-  override def configureSecurityBeforeServersStart(): Unit = {
-super.configureSecurityBeforeServersStart()
+  override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = 
{
+super.configureSecurityBeforeServersStart(testInfo)
 
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
 // Create credentials before starting brokers
 createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)

Review Comment:
   Yes, but converting this test was a pain so I deleted this file and moved 
the test to SaslScramSslEndToEndAuthorizationTest . The test now runs in both 
Zk and KRaft mode from there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KRAFT-14765 and KRAFT-14776: Support for SCRAM at bootstrap with integration tests

2023-03-14 Thread via GitHub


pprovenzano commented on code in PR #13374:
URL: https://github.com/apache/kafka/pull/13374#discussion_r1136321668


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -96,6 +115,9 @@ object StorageTool extends Logging {
   action(store()).
   required(true).
   help("The cluster ID to use.")
+formatParser.addArgument("--add-scram", "-S").
+  action(append()).
+  help("Additional Bootstrap Metadata to add to the cluster.")

Review Comment:
   Yes, I've added the description.



##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -161,20 +163,21 @@ Found problem:
   val metaProperties = MetaProperties(
 clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
   val stream = new ByteArrayOutputStream()
+  val bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(MetadataVersion.latest(), None, "test foramt 
command")

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KRAFT-14765 and KRAFT-14776: Support for SCRAM at bootstrap with integration tests

2023-03-14 Thread via GitHub


pprovenzano commented on code in PR #13374:
URL: https://github.com/apache/kafka/pull/13374#discussion_r1136321240


##
clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java:
##
@@ -40,11 +42,13 @@ public enum ScramMechanism {
 MECHANISMS_MAP = Collections.unmodifiableMap(map);
 }
 
-ScramMechanism(String hashAlgorithm, String macAlgorithm, int 
minIterations) {
+ScramMechanism(byte type, String hashAlgorithm, String macAlgorithm, int 
minIterations, int maxIterations) {
+this.type = type;
 this.mechanismName = "SCRAM-" + hashAlgorithm;
 this.hashAlgorithm = hashAlgorithm;
-this.macAlgorithm = macAlgorithm;
+this.macAlgorithm  = macAlgorithm;

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-14 Thread via GitHub


guozhangwang commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1136315295


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 }
 }
 
+static class UnsentOffsetFetchRequestState extends RequestState {
+public final Set requestedPartitions;
+public final GroupState.Generation requestedGeneration;
+public CompletableFuture> 
future;
+
+public UnsentOffsetFetchRequestState(final Set 
partitions,
+ final GroupState.Generation 
generation,
+ final 
CompletableFuture> future,
+ final long retryBackoffMs) {
+super(retryBackoffMs);
+this.requestedPartitions = partitions;
+this.requestedGeneration = generation;
+this.future = future;
+}
+
+public boolean sameRequest(final UnsentOffsetFetchRequestState 
request) {
+return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+}
+}
+
+/**
+ * This is used to support the committed() API. Here we use a Java 
Collections, {@code unsentRequests}, to
+ * track
+ * the OffsetFetchRequests that haven't been sent, to prevent sending the 
same requests in the same batch.
+ *
+ * If the request is new. It will be enqueued to the {@code 
unsentRequest}, and will be sent upon the next
+ * poll.
+ *
+ * If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+ * completion of the existing one.
+ *
+ * TODO: There is an optimization to present duplication to the sent but 
incompleted requests. I'm not sure if we
+ * need that.
+ */
+class UnsentOffsetFetchRequests {

Review Comment:
   Just to think a bit further here, it makes me thinking that inside the 
`unsentOffsetFetchRequests` we'd need to keep two collections: 1) the unsent 
requests, 2) the sent-but-not-responded requests, and upon getting a new event 
from the queue, we would check against both collections. And then we we drain 
the first collection and write them to the network client, we move them to the 
second collection, and only drop requests from the second collection after a 
response is received and the registered handlers (it's possible to have 
multiple events' handlers registered for the same request).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13365: KAFKA-14491: [17/N] Refactor segments cleanup logic

2023-03-14 Thread via GitHub


vcrfxia commented on code in PR #13365:
URL: https://github.com/apache/kafka/pull/13365#discussion_r1136263824


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java:
##
@@ -57,6 +57,11 @@ public void openExisting(final ProcessorContext context, 
final long streamTime)
 physicalStore.openDB(context.appConfigs(), context.stateDir());
 }
 
+@Override
+public void cleanupExpiredSegments(final long streamTime) {
+super.cleanupExpiredSegments(streamTime);

Review Comment:
   The method from AbstractSegments is protected, but LogicalKeyValueSegments 
needs to expose it publicly so that it can be called from the versioned store 
implementation. 
   
   Admittedly looks odd at first glance. I can add a comment into the code if 
you think it'd be useful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13365: KAFKA-14491: [17/N] Refactor segments cleanup logic

2023-03-14 Thread via GitHub


vcrfxia commented on code in PR #13365:
URL: https://github.com/apache/kafka/pull/13365#discussion_r1136263824


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java:
##
@@ -57,6 +57,11 @@ public void openExisting(final ProcessorContext context, 
final long streamTime)
 physicalStore.openDB(context.appConfigs(), context.stateDir());
 }
 
+@Override
+public void cleanupExpiredSegments(final long streamTime) {
+super.cleanupExpiredSegments(streamTime);

Review Comment:
   The method from AbstractSegments is protected, but LogicalKeyValueSegments 
needs to expose it publicly so that it can be called from the versioned store 
implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores

2023-03-14 Thread via GitHub


vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1136261694


##
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##
@@ -302,7 +319,54 @@ public void 
shouldAllowCustomIQv2ForCustomStoreImplementations() {
 .withPartitions(Collections.singleton(0));
 final StateQueryResult result =
 IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
-assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+assertThat(result.getOnlyPartitionResult().getResult(), 
equalTo("success"));
+}
+
+@Test
+public void shouldCreateGlobalTable() throws Exception {
+// produce data to global store topic and track in-memory for 
processor to verify
+final DataTracker data = new DataTracker();
+produceDataToTopic(globalTableTopic, data, baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+
+// build topology and start app
+final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+streamsBuilder
+.globalTable(
+globalTableTopic,
+Consumed.with(Serdes.Integer(), Serdes.String()),
+Materialized
+.as(new 
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+.withKeySerde(Serdes.Integer())
+.withValueSerde(Serdes.String())
+);
+streamsBuilder
+.stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+.process(() -> new VersionedStoreContentCheckerProcessor(false, 
data))
+.to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+final Properties props = props();
+kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+kafkaStreams.start();
+
+// produce source data to trigger store verifications in processor
+int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp 
+ 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8"));
+
+// wait for output and verify
+final List> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+TestUtils.consumerConfig(
+CLUSTER.bootstrapServers(),
+IntegerDeserializer.class,
+IntegerDeserializer.class),
+outputStream,
+numRecordsProduced);
+
+for (final KeyValue receivedRecord : 
receivedRecords) {
+// verify zero failed checks for each record
+assertThat(receivedRecord.value, equalTo(0));

Review Comment:
   > In your original comment, you say we cannot get the data from 
global-ktable because we cannot inject a Processor
   
   Hm, not sure which comment this refers to. Maybe the comment I made earlier 
was about not being able to issue IQ against versioned stores? Ideally the way 
these tests would be written would be to use IQ to check the contents of the 
stores directly, but because versioned stores don't support that (yet) that's 
why the tests inspect the contents of the stores with a processor. The 
processor writes the number of errors to an output stream and the test 
validates that the output stream contains only zeros (indicating no errors).
   
   > we could use addGlobalStore instead of globalTable to add a Processor.
   
   This test already has a processor which inspects/validates the contents of 
the global store. Have I misunderstood?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136255311


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   The tagged field magic happens in the lower layer of the 
FetchRequestData.java, which is auto-generated. As long as the final 
FetchRequestData object has a default value ReplicaState, it will be removed 
from the serialized message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136238469


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   > So we still rely on IBP to know the version?
   
   Yes
   > does the builder still populate the field?
   
   Yes, when constructing an instance, it will do this.replicaState = new 
ReplicaState() -> this relates to the FetchRequestData.java
   > Does it automatically remove it if we keep the defaults
   
   When serialize the request in FetchRequestData.java
   ```
   if (!this.replicaState.equals(new ReplicaState())) {
   _writable.writeUnsignedVarint(1);
   
_writable.writeUnsignedVarint(this.replicaState.size(_cache, _version));
   replicaState.write(_writable, _cache, _version);
}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores

2023-03-14 Thread via GitHub


vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1136258765


##
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##
@@ -334,18 +399,65 @@ private final int produceSourceData(final long timestamp,
 }
 
 /**
- * Test-only processor for inserting records into a versioned store while 
also tracking
- * them separately in-memory, and performing checks to validate expected 
store contents.
- * Forwards the number of failed checks downstream for consumption.
+ * @param topic   topic to produce to
+ * @param dataTracker map of key -> timestamp -> value for tracking data 
which is produced to
+ *the topic. This method will add the produced data 
into this in-memory
+ *tracker in addition to producing to the topic, in 
order to keep the two
+ *in sync.
+ * @param timestamp   timestamp to produce with
+ * @param keyValues   key-value pairs to produce
+ *
+ * @return number of records produced
+ */
+@SuppressWarnings("varargs")
+@SafeVarargs
+private final int produceDataToTopic(final String topic,
+ final DataTracker dataTracker,
+ final long timestamp,
+ final KeyValue... 
keyValues) {
+produceDataToTopic(topic, timestamp, keyValues);
+
+for (final KeyValue keyValue : keyValues) {
+dataTracker.add(keyValue.key, timestamp, keyValue.value);
+}
+
+return keyValues.length;
+}
+
+/**
+ * Test-only processor for validating expected contents of a versioned 
store, and forwards
+ * the number of failed checks downstream for consumption. Callers specify 
whether the
+ * processor should also be responsible for inserting records into the 
store (while also
+ * tracking them separately in-memory for use in validation).
  */
 private static class VersionedStoreContentCheckerProcessor implements 
Processor {
 
 private ProcessorContext context;
 private VersionedKeyValueStore store;
 
+// whether or not the processor should write records to the store as 
they arrive.
+// must be false for global stores.

Review Comment:
   When a processor accesses a global store, the store is read-only: 
https://github.com/apache/kafka/blob/404a833df7de9a7d5efe35beb5bafb7c6972601e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L167-L170



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136255311


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   The tagged field magic happens in the lower layer of the 
FetchRequestData.java, which is auto-generated. As long as the final result 
FetchRequestData object has a default value ReplicaState, it will be removed 
from the serialized message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136255311


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   The tagged field magic happens in the lower layer of the 
FetchRequestData.java, which is auto-generated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

2023-03-14 Thread via GitHub


vcrfxia commented on PR #13292:
URL: https://github.com/apache/kafka/pull/13292#issuecomment-1468887946

   Thanks @mjsax for your review! I made the refactor you suggested (including 
to the existing WindowedChangelogTopicConfig) and also pushed updates to 
InternalTopicManager. InternalTopicManager is currently unused code but I 
figured it'd be good to update it for the new type of internal topic anyway for 
completeness. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136249875


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   Correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

2023-03-14 Thread via GitHub


vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1136249663


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
 private  InternalTopicConfig 
createChangelogTopicConfig(final StateStoreFactory factory,

   final String name) {
-if (factory.isWindowStore()) {
+if (factory.isVersionedStore()) {
+final VersionedChangelogTopicConfig config = new 
VersionedChangelogTopicConfig(name, factory.logConfig());
+config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   > Yes, I think it would be cleaner this way.
   
   OK, made the updates.
   
   > `delete.retetion.ms` is not for retention based topics, but it's for 
compacted topic
   
   Ah I see. My scala's not the best but it looks like `min.compaction.lag.ms` 
guarantees that any record within `min.compaction.lag.ms` of "now" will not be 
compacted, regardless of `delete.retention.ms`, which is the important point 
that we need to guarantee that older record versions are not prematurely 
compacted.
   
   Interestingly, we might have a case for setting `delete.retention.ms = 0` 
(rather than using the default of 24 hours) since we know that we no longer 
need the older tombstones once `min.compaction.lag.ms` is expired. It's not 
strictly necessary though. Do you think we should set it for completeness? I'm 
fine either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


jolshan commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136245326


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   > Yes, when constructing an instance, it will do this.replicaState = new 
ReplicaState()
   
   The builder is still going to write to these feels as far as I can see in 
the code. (FetchRequest line 266) Its just the case that these happen to be the 
defaults and so the state is equal, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136238469


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   > So we still rely on IBP to know the version?
   
   Yes
   > does the builder still populate the field?
   
   Yes, when constructing an instance, it will do this.replicaState = new 
ReplicaState()
   > Does it automatically remove it if we keep the defaults
   
   When serialize the request:
   ```
   if (!this.replicaState.equals(new ReplicaState())) {
   _writable.writeUnsignedVarint(1);
   
_writable.writeUnsignedVarint(this.replicaState.size(_cache, _version));
   replicaState.write(_writable, _cache, _version);
}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


jolshan commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136221498


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   Also, does the builder still populate the field? Does it automatically 
remove it if we keep the defaults?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


jolshan commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136220531


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   I see. So we still rely on IBP to know the version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136218503


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   It is to reduce the message size for consumer cases where the replicaId and 
replicaEpoch are both -1. When the values are default values (both fields are 
-1), the replicaState will not be parsed in the message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13396: KAFKA-13884; Only voters flush on Fetch response

2023-03-14 Thread via GitHub


jsancio commented on code in PR #13396:
URL: https://github.com/apache/kafka/pull/13396#discussion_r1136211424


##
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##
@@ -975,6 +980,18 @@ void assertFetchRequestData(
 assertEquals(fetchOffset, fetchPartition.fetchOffset());
 assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
 assertEquals(localId.orElse(-1), request.replicaId());
+
+// Assert that voters have flushed up to the fetch offset
+if (localId.isPresent() && voters.contains(localId.getAsInt())) {

Review Comment:
   Okay. Added a check for observers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13396: KAFKA-13884; Only voters flush on Fetch response

2023-03-14 Thread via GitHub


hachikuji commented on code in PR #13396:
URL: https://github.com/apache/kafka/pull/13396#discussion_r1136194254


##
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##
@@ -975,6 +980,18 @@ void assertFetchRequestData(
 assertEquals(fetchOffset, fetchPartition.fetchOffset());
 assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
 assertEquals(localId.orElse(-1), request.replicaId());
+
+// Assert that voters have flushed up to the fetch offset
+if (localId.isPresent() && voters.contains(localId.getAsInt())) {

Review Comment:
   If someone removed the optimization, it might be a notable regression in 
performance, but no tests would fail. I think that makes it worthwhile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13396: KAFKA-13884; Only voters flush on Fetch response

2023-03-14 Thread via GitHub


jsancio commented on code in PR #13396:
URL: https://github.com/apache/kafka/pull/13396#discussion_r1136182878


##
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##
@@ -975,6 +980,18 @@ void assertFetchRequestData(
 assertEquals(fetchOffset, fetchPartition.fetchOffset());
 assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
 assertEquals(localId.orElse(-1), request.replicaId());
+
+// Assert that voters have flushed up to the fetch offset
+if (localId.isPresent() && voters.contains(localId.getAsInt())) {

Review Comment:
   Yeah. I am not sure. Not sure if we should check for "performance" in the 
correctness tests.
   
   I am also not sure what we would check for either. For example, I can add a 
boolean to `MockLog` that gets set to `true` on flush and set to `false` when 
`RaftClientTextContext` checks it.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14809) Connect incorrectly logs that no records were produced by source tasks

2023-03-14 Thread Hector Geraldino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700372#comment-17700372
 ] 

Hector Geraldino commented on KAFKA-14809:
--

No that's perfect. Thanks [~ChrisEgerton]!

> Connect incorrectly logs that no records were produced by source tasks
> --
>
> Key: KAFKA-14809
> URL: https://issues.apache.org/jira/browse/KAFKA-14809
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> There's an *{{if}}* condition when [committing 
> offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
>  that is referencing the wrong variable, so the statement always evaluates to 
> {*}true{*}.
> This causes log statements like the following to be spuriously emitted:
> {quote}[2023-03-14 16:18:04,675] DEBUG WorkerSourceTask\{id=job-0} Either no 
> records were produced by the task since the last offset commit, or every 
> record has been filtered out by a transformation or dropped due to 
> transformation or conversion errors. 
> (org.apache.kafka.connect.runtime.WorkerSourceTask:220)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


jolshan commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136148988


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,23 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
+  // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
"-1", "entityType": "brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [

Review Comment:
   Sorry if I missed this conversation, but why is this tagged? Is it to avoid 
the IBP bump? We would still need it if we no longer set replica ID right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14809) Connect incorrectly logs that no records were produced by source tasks

2023-03-14 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700360#comment-17700360
 ] 

Chris Egerton commented on KAFKA-14809:
---

[~hgeraldino] I wanted a Jira ticket for this so that users could easily find 
the cause of the problem if they were confused by the incorrect log messages, 
so I've taken a stab at changing the title and description to describe not just 
the specific bug in the code, but the user-facing effects that it has. Feel 
free to alter anything you'd like; I just want this to be discoverable by users 
who may be searching for, e.g., log messages to understand what's going wrong.

> Connect incorrectly logs that no records were produced by source tasks
> --
>
> Key: KAFKA-14809
> URL: https://issues.apache.org/jira/browse/KAFKA-14809
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> There's an *{{if}}* condition when [committing 
> offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
>  that is referencing the wrong variable, so the statement always evaluates to 
> {*}true{*}.
> This causes log statements like the following to be spuriously emitted:
> {quote}[2023-03-14 16:18:04,675] DEBUG WorkerSourceTask\{id=job-0} Either no 
> records were produced by the task since the last offset commit, or every 
> record has been filtered out by a transformation or dropped due to 
> transformation or conversion errors. 
> (org.apache.kafka.connect.runtime.WorkerSourceTask:220)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14809) Connect incorrectly logs that no records were produced by source tasks

2023-03-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14809:
--
Description: 
There's an *{{if}}* condition when [committing 
offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
 that is referencing the wrong variable, so the statement always evaluates to 
{*}true{*}.

This causes log statements like the following to be spuriously emitted:
{quote}[2023-03-14 16:18:04,675] DEBUG WorkerSourceTask\{id=job-0} Either no 
records were produced by the task since the last offset commit, or every record 
has been filtered out by a transformation or dropped due to transformation or 
conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:220)
{quote}

  was:
There's an *{{if}}* condition when [committing 
offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
 that is referencing the wrong variable, so the statement always evaluates to 
{*}true{*}.

This causes log statements like the following to be spuriously emitted:
{quote} 
{quote}


> Connect incorrectly logs that no records were produced by source tasks
> --
>
> Key: KAFKA-14809
> URL: https://issues.apache.org/jira/browse/KAFKA-14809
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> There's an *{{if}}* condition when [committing 
> offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
>  that is referencing the wrong variable, so the statement always evaluates to 
> {*}true{*}.
> This causes log statements like the following to be spuriously emitted:
> {quote}[2023-03-14 16:18:04,675] DEBUG WorkerSourceTask\{id=job-0} Either no 
> records were produced by the task since the last offset commit, or every 
> record has been filtered out by a transformation or dropped due to 
> transformation or conversion errors. 
> (org.apache.kafka.connect.runtime.WorkerSourceTask:220)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


jolshan commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136146308


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -302,6 +335,10 @@ public String toString() {
 }
 }
 
+public static int replicaId(FetchRequestData fetchRequestData) {
+return fetchRequestData.replicaId() != -1 ? 
fetchRequestData.replicaId() : fetchRequestData.replicaState().replicaId();

Review Comment:
   In the case of a consumer, we always read replica state (the second part of 
the statement).
   I don't think there are any bugs here, but it might be good to keep in mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14809) Connect incorrectly logs that no records were produced by source task

2023-03-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14809:
--
Summary: Connect incorrectly logs that no records were produced by source 
task  (was: Kafka Connect incorrectly logs that no records were produced by 
source task)

> Connect incorrectly logs that no records were produced by source task
> -
>
> Key: KAFKA-14809
> URL: https://issues.apache.org/jira/browse/KAFKA-14809
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> There's an *{{if}}* condition when [committing 
> offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
>  that is referencing the wrong variable, so the statement always evaluates to 
> {*}true{*}.
> This causes log statements like the following to be spuriously emitted:
> {quote} 
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14809) Connect incorrectly logs that no records were produced by source tasks

2023-03-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14809:
--
Summary: Connect incorrectly logs that no records were produced by source 
tasks  (was: Connect incorrectly logs that no records were produced by source 
task)

> Connect incorrectly logs that no records were produced by source tasks
> --
>
> Key: KAFKA-14809
> URL: https://issues.apache.org/jira/browse/KAFKA-14809
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> There's an *{{if}}* condition when [committing 
> offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
>  that is referencing the wrong variable, so the statement always evaluates to 
> {*}true{*}.
> This causes log statements like the following to be spuriously emitted:
> {quote} 
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14809) Kafka Connect incorrectly logs that no records were produced by source task

2023-03-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14809:
--
Description: 
There's an *{{if}}* condition when [committing 
offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
 that is referencing the wrong variable, so the statement always evaluates to 
{*}true{*}.

This causes log statements like the following to be spuriously emitted:
{quote} 
{quote}

  was:
There's an *{{if}}* condition when [committing 
offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
 that is referencing the wrong variable, so the statement always evaluates to 
{*}true{*}.

It's a subtle bug, which went undetected probably because its only used to log 
information about pending committable offsets.


> Kafka Connect incorrectly logs that no records were produced by source task
> ---
>
> Key: KAFKA-14809
> URL: https://issues.apache.org/jira/browse/KAFKA-14809
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> There's an *{{if}}* condition when [committing 
> offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
>  that is referencing the wrong variable, so the statement always evaluates to 
> {*}true{*}.
> This causes log statements like the following to be spuriously emitted:
> {quote} 
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14809) Kafka Connect incorrectly logs that no records were produced by source task

2023-03-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14809:
--
Summary: Kafka Connect incorrectly logs that no records were produced by 
source task  (was: Fix logging conditional on WorkerSourceTask)

> Kafka Connect incorrectly logs that no records were produced by source task
> ---
>
> Key: KAFKA-14809
> URL: https://issues.apache.org/jira/browse/KAFKA-14809
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> There's an *{{if}}* condition when [committing 
> offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
>  that is referencing the wrong variable, so the statement always evaluates to 
> {*}true{*}.
> It's a subtle bug, which went undetected probably because its only used to 
> log information about pending committable offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14809) Fix logging conditional on WorkerSourceTask

2023-03-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14809:
--
Priority: Minor  (was: Trivial)

> Fix logging conditional on WorkerSourceTask
> ---
>
> Key: KAFKA-14809
> URL: https://issues.apache.org/jira/browse/KAFKA-14809
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> There's an *{{if}}* condition when [committing 
> offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
>  that is referencing the wrong variable, so the statement always evaluates to 
> {*}true{*}.
> It's a subtle bug, which went undetected probably because its only used to 
> log information about pending committable offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hgeraldino commented on pull request #13386: KAFKA-14809 Fix logging conditional on WorkerSourceTask

2023-03-14 Thread via GitHub


hgeraldino commented on PR #13386:
URL: https://github.com/apache/kafka/pull/13386#issuecomment-1468755316

   > LGTM, thanks @hgeraldino!
   > 
   > It would be nice to have a Jira for this so that others who notice the 
logging issue on older versions can know 1) that it is a bug and 2) which 
versions it is fixed on. Would you mind filing a ticket and linking it to this 
PR in the title? We can merge after that.
   
   Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14809) Fix logging conditional on WorkerSourceTask

2023-03-14 Thread Hector Geraldino (Jira)
Hector Geraldino created KAFKA-14809:


 Summary: Fix logging conditional on WorkerSourceTask
 Key: KAFKA-14809
 URL: https://issues.apache.org/jira/browse/KAFKA-14809
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Hector Geraldino
Assignee: Hector Geraldino


There's an *{{if}}* condition when [committing 
offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
 that is referencing the wrong variable, so the statement always evaluates to 
{*}true{*}.

It's a subtle bug, which went undetected probably because its only used to log 
information about pending committable offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #13396: KAFKA-13884; Only voters flush on Fetch response

2023-03-14 Thread via GitHub


hachikuji commented on code in PR #13396:
URL: https://github.com/apache/kafka/pull/13396#discussion_r1136130884


##
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##
@@ -975,6 +980,18 @@ void assertFetchRequestData(
 assertEquals(fetchOffset, fetchPartition.fetchOffset());
 assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
 assertEquals(localId.orElse(-1), request.replicaId());
+
+// Assert that voters have flushed up to the fetch offset
+if (localId.isPresent() && voters.contains(localId.getAsInt())) {

Review Comment:
   Is there anything we can assert in the converse case to verify that the log 
is not flushed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-14 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1136104123


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -149,6 +185,29 @@ Queue stagedCommits() {
 return this.stagedCommits;
 }
 
+/**
+ * Get all the sendable requests, and create a list of UnsentRequest.
+ */
+private List 
sendOffsetFetchRequests(final long currentTimeMs) {
+List requests = 
unsentOffsetFetchRequests.sendableRequests(currentTimeMs);
+List pollResults = new 
ArrayList<>();
+requests.forEach(req -> {
+OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+groupState.groupId, true,
+new ArrayList<>(req.requestedPartitions),
+throwOnFetchStableOffsetUnsupported);
+NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+builder,
+coordinatorRequestManager.coordinator());
+FetchCommittedOffsetResponseHandler cb = new 
FetchCommittedOffsetResponseHandler(req);
+unsentRequest.future().whenComplete((r, t) -> {

Review Comment:
   When we first made the FutureCompletionHandler, we explicitly handled the 
responses in the onComplete method here:
   ```if (response.authenticationException() != null) {
   onFailure(response.authenticationException());
   } else if (response.wasDisconnected()) {
   onFailure(DisconnectException.INSTANCE);
   } else if (response.versionMismatch() != null) {
   onFailure(response.versionMismatch());
   } else {
   future.complete(response);
   }
   ```
   
   I was thinking about your suggestion, but since you mentioned it again, let 
me try to refactor this a bit to make it more pluggin-able   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-14 Thread via GitHub


philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1136096815


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -80,16 +97,18 @@ public CommitRequestManager(
  */
 @Override
 public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-maybeAutoCommit(currentTimeMs);
+maybeAutoCommit();
+List unsentRequests = new 
ArrayList<>();
 
-if (stagedCommits.isEmpty()) {
-return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>());
+if (!stagedCommits.isEmpty()) {

Review Comment:
   Make sense.  I think the reason I did that initially was that I wanted to 
send out all of the commits first before sending the fetchOffsetRequest.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #13396: KAFKA-13884; Only voters flush on Fetch response

2023-03-14 Thread via GitHub


jsancio opened a new pull request, #13396:
URL: https://github.com/apache/kafka/pull/13396

   The leader only requires that voters have flushed their log up to the fetch 
offset before sending a fetch request.
   
   This change only flushes the log when handling the fetch response, if the 
follower is a voter. This should improve the disk performance on observers 
(brokers).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14794) Unable to deserialize base64 JSON strings

2023-03-14 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-14794.

Fix Version/s: 3.5.0
   Resolution: Fixed

> Unable to deserialize base64 JSON strings 
> --
>
> Key: KAFKA-14794
> URL: https://issues.apache.org/jira/browse/KAFKA-14794
> Project: Kafka
>  Issue Type: Bug
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.5.0
>
>
> h1. Problem
> The following test fails:
> {code:java}
> @Test
> public void testBinaryNode() throws IOException {
>     byte[] expected = new byte[] {5, 2, 9, 4, 1, 8, 7, 0, 3, 6};
>     StringWriter writer = new StringWriter();
>     ObjectMapper mapper = new ObjectMapper();
>     mapper.writeTree(mapper.createGenerator(writer), new 
> BinaryNode(expected));
>     JsonNode binaryNode = mapper.readTree(writer.toString());
>     assertTrue(binaryNode.isTextual(), binaryNode.toString());
>     byte[] actual = MessageUtil.jsonNodeToBinary(binaryNode, "Test base64 
> JSON string");
>     assertEquals(expected, actual);
> }
> {code}
> with the following error:
> {code:java}
>  Gradle Test Run :clients:test > Gradle Test Executor 20 > MessageUtilTest > 
> testBinaryNode() FAILED
>     java.lang.RuntimeException: Test base64 JSON string: expected 
> Base64-encoded binary data.
>         at 
> org.apache.kafka.common.protocol.MessageUtil.jsonNodeToBinary(MessageUtil.java:165)
>         at 
> org.apache.kafka.common.protocol.MessageUtilTest.testBinaryNode(MessageUtilTest.java:102)
> {code}
> The reason for the failure is because FasterXML Jackson deserializes base64 
> JSON strings to a TextNode not to a BinaryNode.
> h1. Solution
> The method {{MessageUtil::jsonNodeToBinary}} should not assume that the input 
> {{JsonNode}} is always a {{{}BinaryNode{}}}. It should also support decoding 
> {{{}TextNode{}}}.
> {{JsonNode::binaryValue}} is supported by both {{BinaryNode}} and 
> {{{}TextNode{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] chia7712 opened a new pull request, #13395: MINOR: don't disconnect stale controller if the network client is res…

2023-03-14 Thread via GitHub


chia7712 opened a new pull request, #13395:
URL: https://github.com/apache/kafka/pull/13395

   the unsent request stored by `InterBrokerSendThread` uses stale controller 
even though the `NetworkClient` is reset already. The request will get error 
`NOT_CONTROLLER`, and then the error handle will try to disconnect the "new" 
controller. The new controller is not connected so the call 
`networkClient.disconnect(controllerAddress.idString)` will cause following 
error.
   ```
   java.lang.IllegalStateException: No entry found for connection 1000
at 
org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:411)
at 
org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:182)
at 
org.apache.kafka.clients.NetworkClient.disconnect(NetworkClient.java:328)
at 
kafka.server.BrokerToControllerRequestThread.$anonfun$handleResponse$9(BrokerToControllerChannelManager.scala:405)
at 
kafka.server.BrokerToControllerRequestThread.$anonfun$handleResponse$9$adapted(BrokerToControllerChannelManager.scala:402)
at scala.Option.foreach(Option.scala:437)
at 
kafka.server.BrokerToControllerRequestThread.handleResponse(BrokerToControllerChannelManager.scala:402)
at 
kafka.server.BrokerToControllerRequestThread.$anonfun$generateRequests$1(BrokerToControllerChannelManager.scala:377)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
at 
kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:78)
at 
kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:422)
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
   ```
   
   This is not a critical bug since we swallow the exception, but the error is 
a bit scare to me when testing zk migration :) 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13379: KAFKA-14799: Ignore source task requests to abort empty transactions

2023-03-14 Thread via GitHub


C0urante commented on PR #13379:
URL: https://github.com/apache/kafka/pull/13379#issuecomment-1468683227

   Test failures are unrelated; merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13379: KAFKA-14799: Ignore source task requests to abort empty transactions

2023-03-14 Thread via GitHub


C0urante merged PR #13379:
URL: https://github.com/apache/kafka/pull/13379


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology

2023-03-14 Thread via GitHub


mumrah commented on PR #13390:
URL: https://github.com/apache/kafka/pull/13390#issuecomment-1468662689

   @cmccabe, I see your point about the node ID when debugging tests -- it can 
be annoying to not know which broker instance a thread belongs to. You're kebab 
case examples look good to me  
   
   Did you find where the lone `EventHandler` is coming from?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13386: Fix logging conditional

2023-03-14 Thread via GitHub


C0urante commented on code in PR #13386:
URL: https://github.com/apache/kafka/pull/13386#discussion_r1136051929


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java:
##
@@ -216,7 +216,7 @@ public boolean commitOffsets() {
 this.committableOffsets = CommittableOffsets.EMPTY;
 }
 
-if (committableOffsets.isEmpty()) {
+if (offsetsToCommit.isEmpty()) {

Review Comment:
   Haha, no worries! Considering I wrote this buggy logic in the first place 
I'm at least equally embarrassed here  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13384: KAFKA-14801: Handle sensitive configs during ZK migration

2023-03-14 Thread via GitHub


mumrah commented on code in PR #13384:
URL: https://github.com/apache/kafka/pull/13384#discussion_r1136050391


##
core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala:
##
@@ -49,12 +51,25 @@ class ZkMigrationClientTest extends QuorumTestHarness {
 
   private var migrationState: ZkMigrationLeadershipState = _
 
+  private val SECRET = "secret"
+
+  private val encoder: PasswordEncoder = {
+val encoderProps = new Properties()
+encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get 
around the config validation
+encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk 
secret to encrypt the
+val encoderConfig = new KafkaConfig(encoderProps)

Review Comment:
   Oh, I see -- we're getting the defaults for things like the cipher, key 
length, etc. Let's keep this in, in that case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13368: KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft

2023-03-14 Thread via GitHub


mumrah commented on code in PR #13368:
URL: https://github.com/apache/kafka/pull/13368#discussion_r1136047272


##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -211,12 +214,38 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
 }
   }
 
+  def migrateAcls(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): 
Unit = {
+// This is probably fairly inefficient, but it preserves the semantics 
from AclAuthorizer (which is non-trivial)
+var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, 
VersionedAcls]()(new ResourceOrdering)
+def updateAcls(resourcePattern: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+  allAcls = allAcls.updated(resourcePattern, versionedAcls)
+}
+
+AclAuthorizer.loadAllAcls(zkClient, this, updateAcls)

Review Comment:
   Yes, good call-out -- we will need documentation on this. We will only 
support migrating ACLs from AclAuthorizer to StandardAuthorizer. Otherwise, 
users can continue using whatever authorizer they had configured in ZK mode. In 
fact, AclAuthorizer _could_ still be used in KRaft mode, but we want to 
discourage that :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13368: KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft

2023-03-14 Thread via GitHub


rajinisivaram commented on code in PR #13368:
URL: https://github.com/apache/kafka/pull/13368#discussion_r1136044199


##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -211,12 +214,38 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
 }
   }
 
+  def migrateAcls(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): 
Unit = {
+// This is probably fairly inefficient, but it preserves the semantics 
from AclAuthorizer (which is non-trivial)
+var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, 
VersionedAcls]()(new ResourceOrdering)
+def updateAcls(resourcePattern: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+  allAcls = allAcls.updated(resourcePattern, versionedAcls)
+}
+
+AclAuthorizer.loadAllAcls(zkClient, this, updateAcls)

Review Comment:
   Authorizers are configurable, so could in theory store ACLs in different 
format. Are we going to document that we only support migration from ZK for 
ACLs stored by AclAuthorizer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on a diff in pull request #13384: KAFKA-14801: Handle sensitive configs during ZK migration

2023-03-14 Thread via GitHub


akhileshchg commented on code in PR #13384:
URL: https://github.com/apache/kafka/pull/13384#discussion_r1136044153


##
core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala:
##
@@ -49,12 +51,25 @@ class ZkMigrationClientTest extends QuorumTestHarness {
 
   private var migrationState: ZkMigrationLeadershipState = _
 
+  private val SECRET = "secret"
+
+  private val encoder: PasswordEncoder = {
+val encoderProps = new Properties()
+encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get 
around the config validation
+encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk 
secret to encrypt the
+val encoderConfig = new KafkaConfig(encoderProps)

Review Comment:
   We still need the encoder and right configs to actually encode and decode 
the configs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13384: KAFKA-14801: Handle sensitive configs during ZK migration

2023-03-14 Thread via GitHub


mumrah commented on code in PR #13384:
URL: https://github.com/apache/kafka/pull/13384#discussion_r1136040540


##
core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala:
##
@@ -49,12 +51,25 @@ class ZkMigrationClientTest extends QuorumTestHarness {
 
   private var migrationState: ZkMigrationLeadershipState = _
 
+  private val SECRET = "secret"
+
+  private val encoder: PasswordEncoder = {
+val encoderProps = new Properties()
+encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get 
around the config validation
+encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk 
secret to encrypt the
+val encoderConfig = new KafkaConfig(encoderProps)

Review Comment:
   I don't think we need these anymore, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136021203


##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -1436,6 +1441,38 @@ public void testInvalidFetchRequest() throws Exception {
 context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
 }
 
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+public void testFetchRequestVersionHandling(short version) throws 
Exception {

Review Comment:
   Updated the UT:
   1. With extra comments.
   2. Covered the first tryCompleteFetchRequest in the handleFetchRequest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 opened a new pull request, #13393: KAFKA-10244 An new java interface to replace 'kafka.common.MessageReader'

2023-03-14 Thread via GitHub


chia7712 opened a new pull request, #13393:
URL: https://github.com/apache/kafka/pull/13393

   related to https://issues.apache.org/jira/browse/KAFKA-10244
   
   `kafka.common.MessageReader` is a input argument of kafka-console-producer 
and we expect users can have their custom reader to produce custom records. 
Hence, MessageReader is a public interface and we should offer a java version 
to replace current scala code. Also, the new MessageReader should be placed at 
clients module. (kafka.common.MessageReader is in core module)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13344: MINOR: Replace BrokerMetadataListener with MetadataLoader

2023-03-14 Thread via GitHub


cmccabe merged PR #13344:
URL: https://github.com/apache/kafka/pull/13344


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13386: Fix logging conditional

2023-03-14 Thread via GitHub


hgeraldino commented on code in PR #13386:
URL: https://github.com/apache/kafka/pull/13386#discussion_r1135857544


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java:
##
@@ -216,7 +216,7 @@ public boolean commitOffsets() {
 this.committableOffsets = CommittableOffsets.EMPTY;
 }
 
-if (committableOffsets.isEmpty()) {
+if (offsetsToCommit.isEmpty()) {

Review Comment:
   Yeah this is embarrassing 臘 
   
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology

2023-03-14 Thread via GitHub


cmccabe commented on code in PR #13390:
URL: https://github.com/apache/kafka/pull/13390#discussion_r1135927103


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -131,11 +131,11 @@ class ControllerServer(
 if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
 val startupDeadline = Deadline.fromDelay(time, 
config.serverMaxStartupTimeMs, TimeUnit.MILLISECONDS)
 try {
+  this.logIdent = new LogContext(s"[ControllerServer ${config.nodeId}] 
").logPrefix()
   info("Starting controller")
   config.dynamicConfig.initialize(zkClientOpt = None)
 
   maybeChangeStatus(STARTING, STARTED)
-  this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] 
").logPrefix()

Review Comment:
   Yes, this use-case is a bit of a hack. (A hack that this PR didn't add!)
   
   Basically we're forced to use the old Scala `Logging` trait unless we want 
to rewrite all the BrokerServer stuff. But we want a nice-looking log prefix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology

2023-03-14 Thread via GitHub


cmccabe commented on PR #13390:
URL: https://github.com/apache/kafka/pull/13390#issuecomment-1468497417

   cc @ijuma @mumrah @hachikuji 
   
   So about the thread names thing. I’m open to changing the thread names to 
“kebab case” (i.e. `my-thread-name`)
   
   I do think in the context of JUnit we definitely need to have 
`broker-0-my-thread-name` and `controller-0-my-thread-name`. I find myself 
looking at JUnit backtraces all too often, and having 6 distinct threads named 
`my-thread-name` just doesn't work for me.
   
   So then the big question becomes whether we would want the prefixes in prod 
or not. The "pro" case is that it simplifies the code to just unconditionally 
do that, and avoid cases where someone accidentally forgets to set the prefix. 
The con case is that we should know what node we’re on, so the information is 
redundant.
   
   Although I’ve seen people do weird things like combine several process 
backtraces into one file or send ZK and Kafka logs all to the same file. So I 
don’“t truly believe the “we’ll never need it” case. Maybe “we rarely need it” 
or “we won’t need it if people are reasonable”


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag

2023-03-14 Thread via GitHub


gharris1727 commented on PR #13367:
URL: https://github.com/apache/kafka/pull/13367#issuecomment-1468475024

   > One high-level thought: it seems like we've elected to drop integration 
testing coverage for offset.lag.max = 0 and replace it with offset.lag.max = 
10. Do you think there's any benefit in retaining at least one case where the 
max lag is 0?
   
   Before KAFKA-12468, we did not have offset.lag.max=0 coverage, so this is 
returning to the coverage we had before that patch. Since offset.lag.max=0 does 
have some effect, i'll leave it active in the transactional test. I didn't 
think it was valuable enough to justify a new test suite's runtime, but using 
it in an existing test shouldn't be too harmful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag

2023-03-14 Thread via GitHub


gharris1727 commented on code in PR #13367:
URL: https://github.com/apache/kafka/pull/13367#discussion_r1135890001


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -315,18 +314,16 @@ static class PartitionState {
 
 // true if we should emit an offset sync
 boolean update(long upstreamOffset, long downstreamOffset) {
-// This value is what OffsetSyncStore::translateOffsets would 
compute for this offset given the last sync.
-// Because this method is called at most once for each upstream 
offset, simplify upstreamStep to 1.
+// Emit an offset sync if any of the following conditions are true
+boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == 
-1L;
+// the OffsetSync::translateDownstream method will translate this 
offset 1 past the last sync, so add 1.
 // TODO: share common implementation to enforce this relationship
-long downstreamTargetOffset = lastSyncDownstreamOffset + 1;
-if (lastSyncDownstreamOffset == -1L
-|| downstreamOffset - downstreamTargetOffset >= 
maxOffsetLag
-|| upstreamOffset - previousUpstreamOffset != 1L

Review Comment:
   Oops this leaked from another fix, reverting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1135754940


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -130,10 +131,33 @@ private static Optional optionalEpoch(int 
rawEpochValue) {
 }
 }
 
+public static class SimpleBuilder extends 
AbstractRequest.Builder {
+private final FetchRequestData fetchRequestData;
+public SimpleBuilder(FetchRequestData fetchRequestData) {
+super(ApiKeys.FETCH);
+this.fetchRequestData = fetchRequestData;
+}
+
+@Override
+public FetchRequest build(short version) {
+int replicaId = FetchRequest.replicaId(fetchRequestData);
+long replicaEpoch = fetchRequestData.replicaState().replicaEpoch();
+if (version < 15) {
+fetchRequestData.setReplicaId(replicaId);
+fetchRequestData.setReplicaState(new ReplicaState());
+} else {
+fetchRequestData.setReplicaState(new 
ReplicaState().setReplicaId(replicaId).setReplicaEpoch(replicaEpoch));
+fetchRequestData.setReplicaId(-1);
+}

Review Comment:
   My understanding is that we always use the new format everywhere so we 
should only care about downgrading, no? If we get a replica id >= 0, we could 
even consider throwing an UnsupportedVersionException for instance.



##
core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala:
##
@@ -44,7 +45,10 @@ object KafkaNetworkChannel {
   case fetchRequest: FetchRequestData =>
 // Since we already have the request, we go through a simplified 
builder
 new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
-  override def build(version: Short): FetchRequest = new 
FetchRequest(fetchRequest, version)
+  override def build(version: Short): FetchRequest = {
+val builder = new SimpleBuilder(fetchRequest)
+new FetchRequest(builder.build(version).data(), version)
+  }
   override def toString: String = fetchRequest.toString
 }

Review Comment:
   You can replace all of this by `new 
FetchRequest.SimpleBuilder(fetchRequest)`.



##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -130,10 +131,33 @@ private static Optional optionalEpoch(int 
rawEpochValue) {
 }
 }
 
+public static class SimpleBuilder extends 
AbstractRequest.Builder {

Review Comment:
   nit: Could we put a comment saying that this is only used by the 
KafkaRaftClient?



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -983,16 +987,16 @@ private CompletableFuture 
handleFetchRequest(
 Errors error = Errors.forException(cause);
 if (error != Errors.REQUEST_TIMED_OUT) {
 logger.debug("Failed to handle fetch from {} at {} due to 
{}",
-request.replicaId(), fetchPartition.fetchOffset(), 
error);
+FetchRequest.replicaId(request), 
fetchPartition.fetchOffset(), error);
 return buildEmptyFetchResponse(error, Optional.empty());
 }
 }
 
 // FIXME: `completionTimeMs`, which can be null
 logger.trace("Completing delayed fetch from {} starting at offset 
{} at {}",
-request.replicaId(), fetchPartition.fetchOffset(), 
completionTimeMs);
+FetchRequest.replicaId(request), fetchPartition.fetchOffset(), 
completionTimeMs);
 
-return tryCompleteFetchRequest(request.replicaId(), 
fetchPartition, time.milliseconds());
+return tryCompleteFetchRequest(FetchRequest.replicaId(request), 
fetchPartition, time.milliseconds());

Review Comment:
   nit: Would it make sense to pull `FetchRequest.replicaId(request)` into a 
variable instead of calling it everywhere?



##
core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala:
##
@@ -116,4 +119,18 @@ class RemoteLeaderEndPointTest {
 assertThrows(classOf[UnknownLeaderEpochException], () => 
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1))
 assertThrows(classOf[UnknownLeaderEpochException], () => 
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1))
 }
+
+@Test
+def testBrokerEpochSupplier(): Unit = {
+val tp = new TopicPartition("topic1", 0)
+val topicId1 = Uuid.randomUuid()
+val log: UnifiedLog = mock(classOf[UnifiedLog])
+val partitionMap = Map(
+tp -> PartitionFetchState(Some(topicId1), 150, None, 0, None, 
state = Fetching, lastFetchedEpoch = None))
+when(replicaManager.localLogOrException(tp)).thenReturn(log)
+when(log.logStartOffset).thenReturn(1)
+val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = 

[GitHub] [kafka] hgeraldino commented on a diff in pull request #13386: Fix logging conditional

2023-03-14 Thread via GitHub


hgeraldino commented on code in PR #13386:
URL: https://github.com/apache/kafka/pull/13386#discussion_r1135857544


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java:
##
@@ -216,7 +216,7 @@ public boolean commitOffsets() {
 this.committableOffsets = CommittableOffsets.EMPTY;
 }
 
-if (committableOffsets.isEmpty()) {
+if (offsetsToCommit.isEmpty()) {

Review Comment:
   Yeah this is embarrassing :(  
   
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13386: Fix logging conditional

2023-03-14 Thread via GitHub


C0urante commented on code in PR #13386:
URL: https://github.com/apache/kafka/pull/13386#discussion_r1135813644


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java:
##
@@ -216,7 +216,7 @@ public boolean commitOffsets() {
 this.committableOffsets = CommittableOffsets.EMPTY;
 }
 
-if (committableOffsets.isEmpty()) {
+if (offsetsToCommit.isEmpty()) {

Review Comment:
   Don't we need to update the logging in the `else` branch to use 
`offsetsToCommit` instead of `committableOffsets` as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13392: MINOR: Fix error check in zombie fencing for exactly-once source connectors

2023-03-14 Thread via GitHub


C0urante merged PR #13392:
URL: https://github.com/apache/kafka/pull/13392


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13384: KAFKA-14801 : Decode the sensitive configs and Zk and use encrypted records before writing them to KRaft log

2023-03-14 Thread via GitHub


mumrah commented on code in PR #13384:
URL: https://github.com/apache/kafka/pull/13384#discussion_r1135792879


##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -135,20 +138,34 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
   }
 
   def migrateBrokerConfigs(recordConsumer: 
Consumer[util.List[ApiMessageAndVersion]]): Unit = {
-val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
 val batch = new util.ArrayList[ApiMessageAndVersion]()
+val zkPasswordEncoder = kafkaConfig.passwordEncoderSecret match {

Review Comment:
   Can we pass in a PasswordEncoder to ZkMigrationClient rather than the full 
KafkaConfig? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13379: KAFKA-14799: Ignore source task requests to abort empty transactions

2023-03-14 Thread via GitHub


C0urante commented on PR #13379:
URL: https://github.com/apache/kafka/pull/13379#issuecomment-1468372731

   Thanks @chia7712, good catch with that edge case! You're correct about the 
behavior in that scenario; the record-based boundary is given precedence over 
the batch-based one.
   
   I've added test cases to cover both instances of this scenario (record-based 
commit and batch-based abort, and batch-based commit and record-based abort), 
and I've updated the Javadocs for `TransactionContext` to clarify the behavior 
in this scenario.
   
   I've also fixed a small flaky test issue in the 
`ExactlyOnceWorkerSourceTaskTest` suite.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on a diff in pull request #13379: KAFKA-14799: Ignore source task requests to abort empty transactions

2023-03-14 Thread via GitHub


chia7712 commented on code in PR #13379:
URL: https://github.com/apache/kafka/pull/13379#discussion_r1135751362


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -483,15 +483,19 @@ protected boolean 
shouldCommitTransactionForRecord(SourceRecord record) {
 if (transactionContext.shouldAbortOn(record)) {
 log.info("Aborting transaction for record on topic 
{} as requested by connector", record.topic());
 log.trace("Last record in aborted transaction: 
{}", record);
-abortTransaction();
+maybeAbortTransaction();

Review Comment:
   Thanks for explanation!
   
   +1 again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-03-14 Thread via GitHub


mimaison commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1135392026


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -539,11 +596,20 @@ Map describeTopicConfigs(Set 
topics)
 }
 
 Config targetConfig(Config sourceConfig) {
-List entries = sourceConfig.entries().stream()
-.filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
-.filter(x -> x.source() != 
ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
-.filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
-.collect(Collectors.toList());
+List entries;
+if (useIncrementalAlterConfigs == 
MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) {

Review Comment:
   We should use `equals()` to compare String objects



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -117,6 +122,16 @@ public MirrorSourceConnector() {
 this.configPropertyFilter = configPropertyFilter;
 }
 
+// visible for testing the deprecated setting 
"use.incremental.alter.configs"
+// this constructor should be removed when the deprecated setting is 
removed in Kafka 4.0
+MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy 
replicationPolicy,
+  String useIncrementalAlterConfigs, 
ConfigPropertyFilter configPropertyFilter, Admin targetAdmin) {
+this.sourceAndTarget = sourceAndTarget;
+this.replicationPolicy = replicationPolicy;
+this.configPropertyFilter = configPropertyFilter;
+this.useIncrementalAlterConfigs = useIncrementalAlterConfigs;
+this.targetAdminClient = targetAdmin;
+

Review Comment:
   We're missing a closing bracket here!



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##
@@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
 
 public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = 
"config.properties.exclude";
 public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = 
"config.properties.blacklist";
+public static final String USE_DEFAULTS_FROM = "use.defaults.from";;

Review Comment:
   We can remove the trailing semi colon



##
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java:
##
@@ -78,6 +80,19 @@ public static ListTopicsResult listTopicsResult(String 
topic) {
 return new ListTopicsResult(future);
 }
 
+/**
+ * Helper to create a AlterConfigsResult instance for a given Throwable.
+ * AlterConfigsResult's constructor is only accessible from within the
+ * admin package.
+ */
+public static AlterConfigsResult alterConfigsResult(ConfigResource cr, 
Throwable t) {
+KafkaFutureImpl future = new KafkaFutureImpl<>();
+Map> futures = new HashMap<>();
+futures.put(cr, future);

Review Comment:
   We can use singletonMap here:
   ```suggestion
   Map> futures = 
Collections.singletonMap(cr, future);
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##
@@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
 
 public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = 
"config.properties.exclude";
 public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = 
"config.properties.blacklist";
+public static final String USE_DEFAULTS_FROM = "use.defaults.from";;
+private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's 
default to use when syncing topic configurations.";

Review Comment:
   Should it be `defaults` instead of `default`?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -64,6 +67,7 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;

Review Comment:
   This is already imported just above



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##
@@ -40,11 +44,13 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
+ 
"unclean\\.leader\\.election\\.enable, "
+ 
"min\\.insync\\.replicas";
 private Pattern excludePattern = 
MirrorUtils.compilePatternList(CONFIG_PROPERTIES_EXCLUDE_DEFAULT);
+private String useDefaultsFrom = USE_DEFAULTS_FROM_DEFAULT;

Review Comment:
   Do we need this field? I think we could remove it and do
   ```
   "source".equals(config.useDefaultsFrom())
 

[jira] [Updated] (KAFKA-14672) Producer queue time does not reflect batches expired in the accumulator

2023-03-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14672:
--
Description: 
The producer exposes two metrics for the time a record has spent in the 
accumulator waiting to be drained:
 * {{record-queue-time-avg}}
 * {{record-queue-time-max}}

The metric is only updated when a batch is ready to send to a broker. It is 
also possible for a batch to be expired before it can be sent, but in this 
case, the metric is not updated. This seems surprising and makes the queue time 
misleading. The only metric I could find that does reflect batch expirations in 
the accumulator is the generic {{{}record-error-rate{}}}. It would make sense 
to let the queue-time metrics record the time spent in the queue regardless of 
the outcome of the record send attempt.

  was:
The producer exposes two metrics for the time a record has spent in the 
accumulator waiting to be drained:
 * `record-queue-time-avg`
 * `record-queue-time-max`

The metric is only updated when a batch is ready to send to a broker. It is 
also possible for a batch to be expired before it can be sent, but in this 
case, the metric is not updated. This seems surprising and makes the queue time 
misleading. The only metric I could find that does reflect batch expirations in 
the accumulator is the generic `record-error-rate`. It would make sense to let 
the queue-time metrics record the time spent in the queue regardless of the 
outcome of the record send attempt.


> Producer queue time does not reflect batches expired in the accumulator
> ---
>
> Key: KAFKA-14672
> URL: https://issues.apache.org/jira/browse/KAFKA-14672
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
>
> The producer exposes two metrics for the time a record has spent in the 
> accumulator waiting to be drained:
>  * {{record-queue-time-avg}}
>  * {{record-queue-time-max}}
> The metric is only updated when a batch is ready to send to a broker. It is 
> also possible for a batch to be expired before it can be sent, but in this 
> case, the metric is not updated. This seems surprising and makes the queue 
> time misleading. The only metric I could find that does reflect batch 
> expirations in the accumulator is the generic {{{}record-error-rate{}}}. It 
> would make sense to let the queue-time metrics record the time spent in the 
> queue regardless of the outcome of the record send attempt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology

2023-03-14 Thread via GitHub


ijuma commented on code in PR #13390:
URL: https://github.com/apache/kafka/pull/13390#discussion_r1135739560


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -182,7 +182,7 @@ class BrokerLifecycleManager(
*/
   private[server] val eventQueue = new KafkaEventQueue(time,
 logContext,
-threadNamePrefix.getOrElse(""),
+threadNamePrefix.getOrElse("") + "BrokerLifecycleManager" + nodeId,

Review Comment:
   Check the replica fetcher threads for an example where we do include source 
id in the thread name:
   
   > val threadName = 
s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}-${fetcherPool.name}"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology

2023-03-14 Thread via GitHub


ijuma commented on code in PR #13390:
URL: https://github.com/apache/kafka/pull/13390#discussion_r1135739560


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -182,7 +182,7 @@ class BrokerLifecycleManager(
*/
   private[server] val eventQueue = new KafkaEventQueue(time,
 logContext,
-threadNamePrefix.getOrElse(""),
+threadNamePrefix.getOrElse("") + "BrokerLifecycleManager" + nodeId,

Review Comment:
   Check the replica fetcher threads for an example where we do include 
source/target ids in the thread name:
   
   > val threadName = 
s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}-${fetcherPool.name}"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted

2023-03-14 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700263#comment-17700263
 ] 

Sagar Rao commented on KAFKA-14750:
---

hmm I tried the approach from above and turns out it's not very full proof. IMO 
what seems to be happening is that `partitionsFor` when invoked can fetch the 
metadata from local if it's already there. So, even though the topic is already 
deleted but by the time in the test case above when the task makes a call, it 
still fetches from the local cache. 

> Sink connector fails if a topic matching its topics.regex gets deleted
> --
>
> Key: KAFKA-14750
> URL: https://issues.apache.org/jira/browse/KAFKA-14750
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.1
>Reporter: Sergei Morozov
>Assignee: Sagar Rao
>Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line 
> and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bin/connect-standalone.sh config/connect-standalone.properties 
> config/connect-file-sink.properties
> {code}
> # Create topics for the sink connector to subscribe to:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
> bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --create \
>         --topic connect-test-$j
>   done &
> done
> wait
> {code}
> # Wait until all the created topics are assigned to the connector. Check the 
> number of partitions to be > 0 in the output of:
> {code:bash}
> bin/kafka-consumer-groups.sh \
>     --bootstrap-server localhost:9092 \
>     --group connect-local-file-sink \
>     --describe --members
> {code}
> # Delete the created topics:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --delete \
>         --topic connect-test-$j
>     echo Deleted topic connect-test-$j.
>   done &
> done
> wait
> {code}
> # Observe the connector fail with the following error:
> {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
> expired before the position for partition connect-test-211-0 could be 
> determined
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology

2023-03-14 Thread via GitHub


ijuma commented on code in PR #13390:
URL: https://github.com/apache/kafka/pull/13390#discussion_r1135736972


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -131,11 +131,11 @@ class ControllerServer(
 if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
 val startupDeadline = Deadline.fromDelay(time, 
config.serverMaxStartupTimeMs, TimeUnit.MILLISECONDS)
 try {
+  this.logIdent = new LogContext(s"[ControllerServer ${config.nodeId}] 
").logPrefix()
   info("Starting controller")
   config.dynamicConfig.initialize(zkClientOpt = None)
 
   maybeChangeStatus(STARTING, STARTED)
-  this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] 
").logPrefix()

Review Comment:
   I think the idea of the `LogContext` stuff was to use key/value pairs so 
it's easy to enrich the log context. cc @jason for additional thoughts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-10228) producer: NETWORK_EXCEPTION is thrown instead of a request timeout

2023-03-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-10228.
---
Fix Version/s: 3.5.0
   Resolution: Duplicate

> producer: NETWORK_EXCEPTION is thrown instead of a request timeout
> --
>
> Key: KAFKA-10228
> URL: https://issues.apache.org/jira/browse/KAFKA-10228
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.3.1
>Reporter: Christian Becker
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.5.0
>
>
> We're currently seeing an issue with the java client (producer), when message 
> producing runs into a timeout. Namely a NETWORK_EXCEPTION is thrown instead 
> of a timeout exception.
> *Situation and relevant code:*
> Config
> {code:java}
> request.timeout.ms: 200
> retries: 3
> acks: all{code}
> {code:java}
> for (UnpublishedEvent event : unpublishedEvents) {
> ListenableFuture> future;
> future = kafkaTemplate.send(new ProducerRecord<>(event.getTopic(), 
> event.getKafkaKey(), event.getPayload()));
> futures.add(future.completable());
> }
> CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();{code}
> We're using the KafkaTemplate from SpringBoot here, but it shouldn't matter, 
> as it's merely a wrapper. There we put in batches of messages to be sent.
> 200ms later, we can see the following in the logs: (not sure about the order, 
> they've arrived in the same ms, so our logging system might not display them 
> in the right order)
> {code:java}
> [Producer clientId=producer-1] Received invalid metadata error in produce 
> request on partition events-6 due to 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.. Going to request metadata update now
> [Producer clientId=producer-1] Got error produce response with correlation id 
> 3094 on topic-partition events-6, retrying (2 attempts left). Error: 
> NETWORK_EXCEPTION {code}
> There is also a corresponding error on the broker (within a few ms):
> {code:java}
> Attempting to send response via channel for which there is no open 
> connection, connection id XXX (kafka.network.Processor) {code}
> This was somewhat unexpected and sent us for a hunt across the infrastructure 
> for possible connection issues, but we've found none.
> Side note: In some cases the retries worked and the messages were 
> successfully produced.
> Only after many hours of heavy debugging, we've noticed, that the error might 
> be related to the low timeout setting. We've removed that setting now, as it 
> was a remnant from the past and no longer valid for our use-case. However in 
> order to avoid other people having that issue again and to simplify future 
> debugging, some form of timeout exception should be thrown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13379: KAFKA-14799: Ignore source task requests to abort empty transactions

2023-03-14 Thread via GitHub


C0urante commented on code in PR #13379:
URL: https://github.com/apache/kafka/pull/13379#discussion_r1135720026


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##
@@ -483,15 +483,19 @@ protected boolean 
shouldCommitTransactionForRecord(SourceRecord record) {
 if (transactionContext.shouldAbortOn(record)) {
 log.info("Aborting transaction for record on topic 
{} as requested by connector", record.topic());
 log.trace("Last record in aborted transaction: 
{}", record);
-abortTransaction();
+maybeAbortTransaction();

Review Comment:
   I don't think so; the only time we invoke `Producer::beginTransaction` is 
[here](https://github.com/apache/kafka/blob/ccfc389a638a126c3769bdd72725bae532ca4d01/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L248-L249),
 after which we immediately set `transactionOpen` to `true`, and the only time 
we set `transactionOpen` to `false` is after either aborting a transaction 
(which takes place in this method), or committing the transaction (which takes 
place 
[here](https://github.com/apache/kafka/blob/ccfc389a638a126c3769bdd72725bae532ca4d01/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L311).
 In either case, if we fail the operation (aborting/committing the 
transaction), the task fails, and we [skip the end-of-life offset 
commit](https://github.com/apache/kafka/blob/ccfc389a638a126c3769bdd72725bae532ca4d01/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSour
 ceTask.java#L219-L222).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13378: KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets Commit

2023-03-14 Thread via GitHub


Hangleton commented on PR #13378:
URL: https://github.com/apache/kafka/pull/13378#issuecomment-1468290773

   Thanks David @mumrah for the follow-up!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13372: MINOR: Improved error handling in ZK migration

2023-03-14 Thread via GitHub


mumrah commented on code in PR #13372:
URL: https://github.com/apache/kafka/pull/13372#discussion_r1135709978


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -132,26 +132,45 @@ private boolean isControllerQuorumReadyForMigration() {
 return true;
 }
 
+private boolean imageDoesNotContainAllBrokers(MetadataImage image, 
Set brokerIds) {
+for (BrokerRegistration broker : image.cluster().brokers().values()) {
+if (broker.isMigratingZkBroker()) {
+brokerIds.remove(broker.id());
+}
+}
+return !brokerIds.isEmpty();
+}
+
 private boolean areZkBrokersReadyForMigration() {
 if (image == MetadataImage.EMPTY) {
 // TODO maybe add WAIT_FOR_INITIAL_METADATA_PUBLISH state to avoid 
this kind of check?
 log.info("Waiting for initial metadata publish before checking if 
Zk brokers are registered.");
 return false;
 }
-Set zkRegisteredZkBrokers = 
zkMigrationClient.readBrokerIdsFromTopicAssignments();
-for (BrokerRegistration broker : image.cluster().brokers().values()) {
-if (broker.isMigratingZkBroker()) {
-zkRegisteredZkBrokers.remove(broker.id());
-}
+
+// First check the brokers registered in ZK
+Set zkBrokerRegistrations = new 
HashSet<>(zkMigrationClient.readBrokerIds());

Review Comment:
   Ah, yea this is needed because the Set returned by the client was an 
immutable Scala thing originally. I'll change the client to return a mutable set



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2023-03-14 Thread via GitHub


C0urante commented on code in PR #11565:
URL: https://github.com/apache/kafka/pull/11565#discussion_r1135694291


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public abstract class KafkaTopicBasedBackingStore {

Review Comment:
   Is inheritance necessary here? It seems like we might make this a standalone 
class that can be composed into the various `Kafka*BackingStore` classes (and 
then possibly others) instead of only making it available through subclassing.
   
   (Not a blocker)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13378: KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets Commit

2023-03-14 Thread via GitHub


mumrah commented on PR #13378:
URL: https://github.com/apache/kafka/pull/13378#issuecomment-1468256274

   @Hangleton, yes we should expect some improvement to 
ZkMigrationIntegrationTest with that PR. There are some cases where we lose the 
ZK session during the tests which causes a spurious failure. Yesterday, we 
identified another source of flakiness in that test which turns out to be a 
real bug https://issues.apache.org/jira/browse/KAFKA-14805


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology

2023-03-14 Thread via GitHub


mumrah commented on code in PR #13390:
URL: https://github.com/apache/kafka/pull/13390#discussion_r1135643036


##
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##
@@ -238,7 +238,7 @@ class BrokerToControllerChannelManagerImpl(
 }
 val threadName = threadNamePrefix match {
   case None => s"BrokerToControllerChannelManager 
broker=${config.brokerId} name=$channelName"
-  case Some(name) => s"$name:BrokerToControllerChannelManager 
broker=${config.brokerId} name=$channelName"
+  case Some(name) => s"${name}ToControllerChannelManager 
broker=${config.brokerId} name=$channelName"

Review Comment:
   What is `name` here and where is it supplied?



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -182,7 +182,7 @@ class BrokerLifecycleManager(
*/
   private[server] val eventQueue = new KafkaEventQueue(time,
 logContext,
-threadNamePrefix.getOrElse(""),
+threadNamePrefix.getOrElse("") + "BrokerLifecycleManager" + nodeId,

Review Comment:
   I fired up this branch and yea `BrokerLifecycleManager1EventHandler` is not 
pretty :)
   
   I'm not sure we need node ID in the thread name since we will presumably 
know which node a thread dump came from. It might actually be confusing since 
the convention we have is `{thread name}-{thread number in pool}` like 
"metrics-meter-tick-thread-1", "metrics-meter-tick-thread-2", 
"data-plane-kafka-request-handler-0", "data-plane-kafka-request-handler-1", 
etc. What about something like `broker-lifecycle-manager-event-handler` or 
`BrokerLifecycleManager-EventHandler`
   
   I also noticed somewhere we are not prefixing the event queue thread name, 
there is just an `EventHandler` thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag

2023-03-14 Thread via GitHub


C0urante commented on code in PR #13367:
URL: https://github.com/apache/kafka/pull/13367#discussion_r1135661761


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -315,18 +314,16 @@ static class PartitionState {
 
 // true if we should emit an offset sync
 boolean update(long upstreamOffset, long downstreamOffset) {
-// This value is what OffsetSyncStore::translateOffsets would 
compute for this offset given the last sync.
-// Because this method is called at most once for each upstream 
offset, simplify upstreamStep to 1.
+// Emit an offset sync if any of the following conditions are true
+boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == 
-1L;
+// the OffsetSync::translateDownstream method will translate this 
offset 1 past the last sync, so add 1.
 // TODO: share common implementation to enforce this relationship
-long downstreamTargetOffset = lastSyncDownstreamOffset + 1;
-if (lastSyncDownstreamOffset == -1L
-|| downstreamOffset - downstreamTargetOffset >= 
maxOffsetLag
-|| upstreamOffset - previousUpstreamOffset != 1L

Review Comment:
   Why are we removing this condition?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -315,18 +314,16 @@ static class PartitionState {
 
 // true if we should emit an offset sync
 boolean update(long upstreamOffset, long downstreamOffset) {
-// This value is what OffsetSyncStore::translateOffsets would 
compute for this offset given the last sync.
-// Because this method is called at most once for each upstream 
offset, simplify upstreamStep to 1.
+// Emit an offset sync if any of the following conditions are true
+boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == 
-1L;
+// the OffsetSync::translateDownstream method will translate this 
offset 1 past the last sync, so add 1.
 // TODO: share common implementation to enforce this relationship
-long downstreamTargetOffset = lastSyncDownstreamOffset + 1;
-if (lastSyncDownstreamOffset == -1L
-|| downstreamOffset - downstreamTargetOffset >= 
maxOffsetLag
-|| upstreamOffset - previousUpstreamOffset != 1L

Review Comment:
   Why are we removing this condition? Is that necessary for this fix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology

2023-03-14 Thread via GitHub


ijuma commented on code in PR #13390:
URL: https://github.com/apache/kafka/pull/13390#discussion_r1135641916


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -182,7 +182,7 @@ class BrokerLifecycleManager(
*/
   private[server] val eventQueue = new KafkaEventQueue(time,
 logContext,
-threadNamePrefix.getOrElse(""),
+threadNamePrefix.getOrElse("") + "BrokerLifecycleManager" + nodeId,

Review Comment:
   We typically separate the node id via a dash. Have you tried to be 
consistent with what we do outside of kraft? That would help when debugging the 
system.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13390: MINOR: Standardize KRaft logging, thread names, and terminology

2023-03-14 Thread via GitHub


ijuma commented on code in PR #13390:
URL: https://github.com/apache/kafka/pull/13390#discussion_r1135641916


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -182,7 +182,7 @@ class BrokerLifecycleManager(
*/
   private[server] val eventQueue = new KafkaEventQueue(time,
 logContext,
-threadNamePrefix.getOrElse(""),
+threadNamePrefix.getOrElse("") + "BrokerLifecycleManager" + nodeId,

Review Comment:
   We typically separate the id via a dash. Have you tried to be consistent 
with what we do outside of kraft? That would help when debugging the system.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2023-03-14 Thread via GitHub


C0urante commented on code in PR #11565:
URL: https://github.com/apache/kafka/pull/11565#discussion_r1135639682


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -70,6 +74,9 @@
 public class TopicAdmin implements AutoCloseable {
 
 public static final TopicCreationResponse EMPTY_CREATION = new 
TopicCreationResponse(Collections.emptySet(), Collections.emptySet());
+private static final List> 
CAUSES_TO_RETRY_TOPIC_CREATION = Arrays.asList(
+InvalidReplicationFactorException.class,
+TimeoutException.class);

Review Comment:
   IMO this would be a bit too drastic; the fallout of a change like that 
scales linearly with the retry duration that the user has configured and may 
delay surfacing valid issues for a bit too long. It could be especially 
frustrating in quickstart scenarios where someone defines a replication factor 
that exceeds the number of local brokers and has to wait an entire minute to 
find out what the problem is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14803) topic deletion bug

2023-03-14 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700217#comment-17700217
 ] 

Chris Egerton commented on KAFKA-14803:
---

[~bwei9awf] It looks like this is a duplicate of KAFKA-14802 so I've closed it; 
if there are any updates you'd like to make for this issue, would you mind 
making them on that ticket? Thanks!

> topic deletion bug
> --
>
> Key: KAFKA-14803
> URL: https://issues.apache.org/jira/browse/KAFKA-14803
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, replication
>Affects Versions: 3.3.2
> Environment: AWS m5.xlarge EC2 instance
>Reporter: Behavox
>Priority: Major
> Attachments: server.properties
>
>
> topic deletion doesn't work as expected when attempting to delete topic(s), 
> after successful deletion topic is recreated in a multi-controller 
> environment with 3 controllers and ReplicationFactor: 2
> How to reproduce - attempt to delete topic. Topic is removed successfully and 
> recreated right after removal. Example below shows a single topic named 
> example-topic. We have a total count of 17000 topics in the affected cluster. 
>  
> Our config is attached. 
> Run 1
> [2023-03-10 16:16:45,625] INFO [Controller 1] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,722] INFO [Controller 1] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:16:45,730] INFO [Controller 2] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,851] INFO [Controller 2] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:16:45,837] INFO [Controller 3] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,833] INFO [Controller 3] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> Run 2
> [2023-03-10 16:20:22,469] INFO [Controller 1] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:19,711] INFO [Controller 1] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:20:22,674] INFO [Controller 2] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:20,022] INFO [Controller 2] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:20:22,674] INFO [Controller 3] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:20,020] INFO [Controller 3] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14802) topic deletion bug

2023-03-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14802:
--
Environment: AWS m5.xlarge EC2 instance

> topic deletion bug
> --
>
> Key: KAFKA-14802
> URL: https://issues.apache.org/jira/browse/KAFKA-14802
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, replication
>Affects Versions: 3.3.2
> Environment: AWS m5.xlarge EC2 instance
>Reporter: Behavox
>Priority: Major
> Attachments: server.properties
>
>
> topic deletion doesn't work as expected when attempting to delete topic(s), 
> after successful deletion topic is recreated in a multi-controller 
> environment with 3 controllers and ReplicationFactor: 2
> How to reproduce - attempt to delete topic. Topic is removed successfully and 
> recreated right after removal. Example below shows a single topic named 
> example-topic. We have a total count of 17000 topics in the affected cluster.
>  
> Our config is attached. 
> Run 1
> [2023-03-10 16:16:45,625] INFO [Controller 1] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,722] INFO [Controller 1] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:16:45,730] INFO [Controller 2] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,851] INFO [Controller 2] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:16:45,837] INFO [Controller 3] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,833] INFO [Controller 3] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> Run 2
> [2023-03-10 16:20:22,469] INFO [Controller 1] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:19,711] INFO [Controller 1] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:20:22,674] INFO [Controller 2] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:20,022] INFO [Controller 2] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:20:22,674] INFO [Controller 3] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:20,020] INFO [Controller 3] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14803) topic deletion bug

2023-03-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14803.
---
Resolution: Duplicate

> topic deletion bug
> --
>
> Key: KAFKA-14803
> URL: https://issues.apache.org/jira/browse/KAFKA-14803
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, replication
>Affects Versions: 3.3.2
> Environment: AWS m5.xlarge EC2 instance
>Reporter: Behavox
>Priority: Major
> Attachments: server.properties
>
>
> topic deletion doesn't work as expected when attempting to delete topic(s), 
> after successful deletion topic is recreated in a multi-controller 
> environment with 3 controllers and ReplicationFactor: 2
> How to reproduce - attempt to delete topic. Topic is removed successfully and 
> recreated right after removal. Example below shows a single topic named 
> example-topic. We have a total count of 17000 topics in the affected cluster. 
>  
> Our config is attached. 
> Run 1
> [2023-03-10 16:16:45,625] INFO [Controller 1] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,722] INFO [Controller 1] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:16:45,730] INFO [Controller 2] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,851] INFO [Controller 2] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:16:45,837] INFO [Controller 3] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,833] INFO [Controller 3] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> Run 2
> [2023-03-10 16:20:22,469] INFO [Controller 1] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:19,711] INFO [Controller 1] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:20:22,674] INFO [Controller 2] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:20,022] INFO [Controller 2] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:20:22,674] INFO [Controller 3] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:20,020] INFO [Controller 3] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14808) Partition becomes leaderless when new partition reassignment removes the adding replica

2023-03-14 Thread Shenglong Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shenglong Zhang reassigned KAFKA-14808:
---

Assignee: Shenglong Zhang

> Partition becomes leaderless when new partition reassignment removes the 
> adding replica
> ---
>
> Key: KAFKA-14808
> URL: https://issues.apache.org/jira/browse/KAFKA-14808
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.4.0
>Reporter: Shenglong Zhang
>Assignee: Shenglong Zhang
>Priority: Major
>
> If there is ongoing partition reassignment and any adding replica has been 
> elected as leader (due to preferred leader election or other reason), the 
> partition will immediately becomes leaderless on receiving a new partition 
> reassignment which removes that adding replica.
> 1) partition-0 has replicas [0, 2]
> 2) partition-0 is being reassigned to [1, 0, 2], and somehow this 
> reassignment is stuck (e.g. broker 2 is down).
> 3) Preferred leader election is triggered, and broker 1 is elected as leader.
> 4) When submitting a new partition reassignment to [2, 0, 3], which remove 
> broker 1 and add broker 3, partition will become leaderless.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14808) Partition becomes leaderless when new partition reassignment removes the adding replica

2023-03-14 Thread Shenglong Zhang (Jira)
Shenglong Zhang created KAFKA-14808:
---

 Summary: Partition becomes leaderless when new partition 
reassignment removes the adding replica
 Key: KAFKA-14808
 URL: https://issues.apache.org/jira/browse/KAFKA-14808
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 3.4.0
Reporter: Shenglong Zhang


If there is ongoing partition reassignment and any adding replica has been 
elected as leader (due to preferred leader election or other reason), the 
partition will immediately becomes leaderless on receiving a new partition 
reassignment which removes that adding replica.

1) partition-0 has replicas [0, 2]

2) partition-0 is being reassigned to [1, 0, 2], and somehow this reassignment 
is stuck (e.g. broker 2 is down).

3) Preferred leader election is triggered, and broker 1 is elected as leader.

4) When submitting a new partition reassignment to [2, 0, 3], which remove 
broker 1 and add broker 3, partition will become leaderless.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2023-03-14 Thread via GitHub


viktorsomogyi commented on code in PR #11565:
URL: https://github.com/apache/kafka/pull/11565#discussion_r1135376859


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -328,6 +335,48 @@ public Set createTopics(NewTopic... topics) {
 return createOrFindTopics(topics).createdTopics();
 }
 
+/**
+ * Implements a retry logic around creating topic(s) in case it'd fail due 
to InvalidReplicationFactorException

Review Comment:
   nit: add "or TimeoutException"



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -70,6 +74,9 @@
 public class TopicAdmin implements AutoCloseable {
 
 public static final TopicCreationResponse EMPTY_CREATION = new 
TopicCreationResponse(Collections.emptySet(), Collections.emptySet());
+private static final List> 
CAUSES_TO_RETRY_TOPIC_CREATION = Arrays.asList(
+InvalidReplicationFactorException.class,
+TimeoutException.class);

Review Comment:
   Since TimeoutException is a RetriableException I was wondering whether we 
could refactor InvalidReplicationFactorException because in some circumstances 
it can be retriable and I think even if there won't be more brokers, it doesn't 
hurt too much to retry a few times with backoff. What do you think @mimaison, 
would this be considered an API change?



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -328,6 +335,48 @@ public Set createTopics(NewTopic... topics) {
 return createOrFindTopics(topics).createdTopics();
 }
 
+/**
+ * Implements a retry logic around creating topic(s) in case it'd fail due 
to InvalidReplicationFactorException
+ *
+ * @param topicDescription
+ * @param timeoutMs
+ * @param backOffMs
+ * @param time
+ * @return the same as {@link TopicAdmin#createTopics(NewTopic...)}

Review Comment:
   nit: please fill out parameters and return value if you add javadocs.



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -328,6 +335,48 @@ public Set createTopics(NewTopic... topics) {
 return createOrFindTopics(topics).createdTopics();
 }
 
+/**
+ * Implements a retry logic around creating topic(s) in case it'd fail due 
to InvalidReplicationFactorException
+ *
+ * @param topicDescription
+ * @param timeoutMs
+ * @param backOffMs
+ * @param time
+ * @return the same as {@link TopicAdmin#createTopics(NewTopic...)}
+ */
+public Set createTopicsWithRetry(NewTopic topicDescription, long 
timeoutMs, long backOffMs, Time time) {

Review Comment:
   I think it would be a bit more robust to add an overload to 
{{org.apache.kafka.connect.util.RetryUtil#retryUntilTimeout}} that specifies a 
set of exceptions or a condition to retry on and use that here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14804) Connect docs fail to build with Gradle Swagger plugin 2.2.8

2023-03-14 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-14804.

Fix Version/s: 3.5.0
   Resolution: Fixed

> Connect docs fail to build with Gradle Swagger plugin 2.2.8
> ---
>
> Key: KAFKA-14804
> URL: https://issues.apache.org/jira/browse/KAFKA-14804
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: Mickael Maison
>Priority: Minor
> Fix For: 3.5.0
>
>
> There is an incompatibility somewhere between versions 2.2.0 and 2.2.8 that 
> cause the following error when building the connect docs:
> {code}
> Caused by: org.gradle.api.GradleException: 
> io.swagger.v3.jaxrs2.integration.SwaggerLoader.setOpenAPI31(java.lang.Boolean)
> at 
> io.swagger.v3.plugins.gradle.tasks.ResolveTask.resolve(ResolveTask.java:458)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at org.gradle.internal.reflect.JavaMethod.invoke(JavaMethod.java:125)
> at 
> org.gradle.api.internal.project.taskfactory.StandardTaskAction.doExecute(StandardTaskAction.java:58)
> at 
> org.gradle.api.internal.project.taskfactory.StandardTaskAction.execute(StandardTaskAction.java:51)
> at 
> org.gradle.api.internal.project.taskfactory.StandardTaskAction.execute(StandardTaskAction.java:29)
> at 
> org.gradle.api.internal.tasks.execution.TaskExecution$3.run(TaskExecution.java:242)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner$1.execute(DefaultBuildOperationRunner.java:29)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner$1.execute(DefaultBuildOperationRunner.java:26)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner$2.execute(DefaultBuildOperationRunner.java:66)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner$2.execute(DefaultBuildOperationRunner.java:59)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner.execute(DefaultBuildOperationRunner.java:157)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner.execute(DefaultBuildOperationRunner.java:59)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner.run(DefaultBuildOperationRunner.java:47)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.run(DefaultBuildOperationExecutor.java:68)
> at 
> org.gradle.api.internal.tasks.execution.TaskExecution.executeAction(TaskExecution.java:227)
> at 
> org.gradle.api.internal.tasks.execution.TaskExecution.executeActions(TaskExecution.java:210)
> at 
> org.gradle.api.internal.tasks.execution.TaskExecution.executeWithPreviousOutputFiles(TaskExecution.java:193)
> at 
> org.gradle.api.internal.tasks.execution.TaskExecution.execute(TaskExecution.java:166)
> at 
> org.gradle.internal.execution.steps.ExecuteStep.executeInternal(ExecuteStep.java:93)
> at 
> org.gradle.internal.execution.steps.ExecuteStep.access$000(ExecuteStep.java:44)
> at 
> org.gradle.internal.execution.steps.ExecuteStep$1.call(ExecuteStep.java:57)
> at 
> org.gradle.internal.execution.steps.ExecuteStep$1.call(ExecuteStep.java:54)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner$CallableBuildOperationWorker.execute(DefaultBuildOperationRunner.java:204)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner$CallableBuildOperationWorker.execute(DefaultBuildOperationRunner.java:199)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner$2.execute(DefaultBuildOperationRunner.java:66)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner$2.execute(DefaultBuildOperationRunner.java:59)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner.execute(DefaultBuildOperationRunner.java:157)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner.execute(DefaultBuildOperationRunner.java:59)
> at 
> org.gradle.internal.operations.DefaultBuildOperationRunner.call(DefaultBuildOperationRunner.java:53)
> at 
> org.gradle.internal.operations.DefaultBuildOperationExecutor.call(DefaultBuildOperationExecutor.java:73)
> at 
> org.gradle.internal.execution.steps.ExecuteStep.execute(ExecuteStep.java:54)
> at 
> org.gradle.internal.execution.steps.ExecuteStep.execute(ExecuteStep.java:44)
> at 
> 

[GitHub] [kafka] mimaison merged pull request #13388: MINOR: Align swagger dependencies with gradle plugin

2023-03-14 Thread via GitHub


mimaison merged PR #13388:
URL: https://github.com/apache/kafka/pull/13388


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13388: MINOR: Align swagger dependencies with gradle plugin

2023-03-14 Thread via GitHub


mimaison commented on PR #13388:
URL: https://github.com/apache/kafka/pull/13388#issuecomment-1467878813

   Build failures are not related, merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14807) MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the pause of replication of consumer groups

2023-03-14 Thread Zhaoli (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhaoli updated KAFKA-14807:
---
Description: 
We use MirrorMaker2 to replicate messages and consumergroup offsets from kafka 
cluster `source` to cluster `target`.
In order to reduce the load on the source cluster, we add this configuration to 
mm2 to avoid replicate the whole history messages:
{code:java}
source.consumer.auto.offset.reset=latest {code}
After that, we found part of the consumergroup offsets have stopped replicating.

The common characteristic of these consumergroups is  their EMPTY status,which 
means they have no active members at that monent. All the active 
consumergroups‘ offset replication work as normal.

After researching the source code,we found this is because the configuration 
above also affect the consumption of topic `mm2-offset-syncs`, therefore the 
map `offsetSyncs` dosen't hold the whole topicPartitions:
{code:java}
private final Map offsetSyncs = new HashMap<>(); 
{code}
And the lost topicPartitions lead to the pause of replication of the EMPTY 
consumer groups, which is not expected.
{code:java}
OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
upstreamOffset) {
Optional offsetSync = latestOffsetSync(sourceTopicPartition);
if (offsetSync.isPresent()) {
if (offsetSync.get().upstreamOffset() > upstreamOffset) {
// Offset is too far in the past to translate accurately
return OptionalLong.of(-1L);
}
long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
return OptionalLong.of(offsetSync.get().downstreamOffset() + 
upstreamStep);
} else {
return OptionalLong.empty();
}
}{code}
 

  was:
We use MirrorMaker2 to replicate messages and consumergroup offsets from kafka 
cluster `source` to cluster `target`.
In order to reduce the load on the source cluster, we add this configuration to 
mm2 to avoid replicate the whole history messages:
{code:java}
source.consumer.auto.offset.reset=latest {code}
After that, we found part of the consumergroup offsets have stopped replicating.

The common characteristic of these consumergroups is  their EMPTY status,which 
means they have no active members at that monent. All the active 
consumergroups‘ offset replication work as normal.

After researching the source code,we found this is because the configuration 
above also affect the consumption of topic `mm2-offset-syncs`, therefore the 
map `offsetSyncs` dosen't hold the whole topicPartitions:
{code:java}
private final Map offsetSyncs = new HashMap<>(); 
{code}
And the lost topicPartitions lead to the pause of replication of the EMPTY 
consumer groups, which is not expected.

 

 


> MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the 
> pause of replication of consumer groups
> ---
>
> Key: KAFKA-14807
> URL: https://issues.apache.org/jira/browse/KAFKA-14807
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.0, 3.3.1, 3.3.2
> Environment: centos7
>Reporter: Zhaoli
>Priority: Major
>
> We use MirrorMaker2 to replicate messages and consumergroup offsets from 
> kafka cluster `source` to cluster `target`.
> In order to reduce the load on the source cluster, we add this configuration 
> to mm2 to avoid replicate the whole history messages:
> {code:java}
> source.consumer.auto.offset.reset=latest {code}
> After that, we found part of the consumergroup offsets have stopped 
> replicating.
> The common characteristic of these consumergroups is  their EMPTY 
> status,which means they have no active members at that monent. All the active 
> consumergroups‘ offset replication work as normal.
> After researching the source code,we found this is because the configuration 
> above also affect the consumption of topic `mm2-offset-syncs`, therefore the 
> map `offsetSyncs` dosen't hold the whole topicPartitions:
> {code:java}
> private final Map offsetSyncs = new HashMap<>(); 
> {code}
> And the lost topicPartitions lead to the pause of replication of the EMPTY 
> consumer groups, which is not expected.
> {code:java}
> OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
> upstreamOffset) {
> Optional offsetSync = latestOffsetSync(sourceTopicPartition);
> if (offsetSync.isPresent()) {
> if (offsetSync.get().upstreamOffset() > upstreamOffset) {
> // Offset is too far in the past to translate accurately
> return OptionalLong.of(-1L);
> }
> long upstreamStep = upstreamOffset - 
> offsetSync.get().upstreamOffset();
> return OptionalLong.of(offsetSync.get().downstreamOffset() + 
> 

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-03-14 Thread via GitHub


divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1135316350


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##
@@ -62,8 +60,10 @@ public void setup() {
 @AfterEach
 public void teardown() throws Throwable {
 AtomicReference shutdownFailure = new AtomicReference<>();
-mirrorMakers.forEach((name, mirrorMaker) ->
-Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure)
+mirrorMakers.forEach((name, mirrorMaker) -> {
+Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure);
+mirrorMaker.awaitStop();
+}

Review Comment:
   done in latest commit.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##
@@ -62,8 +60,10 @@ public void setup() {
 @AfterEach
 public void teardown() throws Throwable {
 AtomicReference shutdownFailure = new AtomicReference<>();
-mirrorMakers.forEach((name, mirrorMaker) ->
-Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure)
+mirrorMakers.forEach((name, mirrorMaker) -> {
+Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure);
+mirrorMaker.awaitStop();
+}

Review Comment:
   good idea. done in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >