[GitHub] [kafka] kamalcph commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-15 Thread via GitHub


kamalcph commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1295402786


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -1003,6 +1015,134 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
 }
 }
 
+private static RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map segmentEpochs) {
+return new RemoteLogSegmentMetadata(
+new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(),
+new TopicPartition("topic", 0)), Uuid.randomUuid()),
+startOffset, endOffset,
+10L,
+1,
+10L,
+1000,
+Optional.empty(),
+RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
+}
+
+@Test
+public void testRemoteSegmentWithinLeaderEpochs() {
+// Test whether a remote segment is within the leader epochs
+final long logEndOffset = 90L;
+
+TreeMap leaderEpochToStartOffset = new TreeMap() {{
+put(0, 0L);
+put(1, 10L);
+put(2, 20L);
+put(3, 30L);
+put(4, 40L);
+put(5, 50L);
+put(7, 70L);
+}};

Review Comment:
   For clean code, it creates an anonymous extra class at every usage and we 
should try to avoid this pattern.
   
   https://www.baeldung.com/java-initialize-hashmap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-15 Thread via GitHub


kamalcph commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1295394486


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +625,230 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {

Review Comment:
   Filed KAFKA-15351 and KAFKA-15352 to track the cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15352) Ensure consistency while deleting the remote log segments

2023-08-15 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15352:
-
Parent: KAFKA-7739
Issue Type: Sub-task  (was: Task)

> Ensure consistency while deleting the remote log segments
> -
>
> Key: KAFKA-15352
> URL: https://issues.apache.org/jira/browse/KAFKA-15352
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> In Kafka-14888, the remote log segments are deleted which breaches the 
> retention time/size before updating the log-start-offset. In middle of 
> deletion, if the consumer starts to read from the beginning of the topic, 
> then it will fail to read the messages and UNKNOWN_SERVER_ERROR will be 
> thrown back to the consumer.
> To ensure consistency, similar to local log segments where the actual 
> segments are deleted after {{segment.delete.delay.ms}}, we should update the 
> log-start-offset first before deleting the remote log segment.
> See the [PR#13561|https://github.com/apache/kafka/pull/13561] and 
> [comment|https://github.com/apache/kafka/pull/13561#discussion_r1293086543] 
> for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15352) Ensure consistency while deleting the remote log segments

2023-08-15 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15352:


 Summary: Ensure consistency while deleting the remote log segments
 Key: KAFKA-15352
 URL: https://issues.apache.org/jira/browse/KAFKA-15352
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash


In Kafka-14888, the remote log segments are deleted which breaches the 
retention time/size before updating the log-start-offset. In middle of 
deletion, if the consumer starts to read from the beginning of the topic, then 
it will fail to read the messages and UNKNOWN_SERVER_ERROR will be thrown back 
to the consumer.

To ensure consistency, similar to local log segments where the actual segments 
are deleted after {{segment.delete.delay.ms}}, we should update the 
log-start-offset first before deleting the remote log segment.

See the [PR#13561|https://github.com/apache/kafka/pull/13561] and 
[comment|https://github.com/apache/kafka/pull/13561#discussion_r1293086543] for 
more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15351) Update log-start-offset after leader election for topics enabled with remote storage

2023-08-15 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15351:
-
Description: 
Case-1:

In the FETCH response, the leader-log-start-offset will be piggy-backed. But, 
there can be a scenario:
 # Leader deleted the remote log segment and updates it's log-start-offset
 # Before the replica-2 update it's log-start-offset via FETCH-request, the 
leadership changed to replica-2.
 # There are no more eligible segments to delete from remote.
 # The log-start-offset will be stale (referring to old log-start-offset but 
the data was already removed from remote)
 # If the consumer starts to read from the beginning of the topic, it will fail 
to read.

 

Case-2:

The old-leader (follower) can delete the remote log segment in middle of leader 
election. We need to update the log-start-offset metadata for this case.

See this comment 
[https://github.com/apache/kafka/pull/13561#discussion_r1226538752] for more 
details.

  was:
In the FETCH response, the leader-log-start-offset will be piggy-backed. But, 
there can be a scenario:
 # Leader deleted the remote log segment and updates it's log-start-offset
 # Before the replica-2 update it's log-start-offset via FETCH-request, the 
leadership changed to replica-2.
 # There are no more eligible segments to delete from remote.
 # The log-start-offset will be stale (referring to old log-start-offset but 
the data was already removed from remote)
 # If the consumer starts to read from the beginning of the topic, it will fail 
to read.


> Update log-start-offset after leader election for topics enabled with remote 
> storage
> 
>
> Key: KAFKA-15351
> URL: https://issues.apache.org/jira/browse/KAFKA-15351
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> Case-1:
> In the FETCH response, the leader-log-start-offset will be piggy-backed. But, 
> there can be a scenario:
>  # Leader deleted the remote log segment and updates it's log-start-offset
>  # Before the replica-2 update it's log-start-offset via FETCH-request, the 
> leadership changed to replica-2.
>  # There are no more eligible segments to delete from remote.
>  # The log-start-offset will be stale (referring to old log-start-offset but 
> the data was already removed from remote)
>  # If the consumer starts to read from the beginning of the topic, it will 
> fail to read.
>  
> Case-2:
> The old-leader (follower) can delete the remote log segment in middle of 
> leader election. We need to update the log-start-offset metadata for this 
> case.
> See this comment 
> [https://github.com/apache/kafka/pull/13561#discussion_r1226538752] for more 
> details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15351) Update log-start-offset after leader election for topics enabled with remote storage

2023-08-15 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15351:


 Summary: Update log-start-offset after leader election for topics 
enabled with remote storage
 Key: KAFKA-15351
 URL: https://issues.apache.org/jira/browse/KAFKA-15351
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash


In the FETCH response, the leader-log-start-offset will be piggy-backed. But, 
there can be a scenario:
 # Leader deleted the remote log segment and updates it's log-start-offset
 # Before the replica-2 update it's log-start-offset via FETCH-request, the 
leadership changed to replica-2.
 # There are no more eligible segments to delete from remote.
 # The log-start-offset will be stale (referring to old log-start-offset but 
the data was already removed from remote)
 # If the consumer starts to read from the beginning of the topic, it will fail 
to read.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15351) Update log-start-offset after leader election for topics enabled with remote storage

2023-08-15 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15351:
-
Parent: KAFKA-7739
Issue Type: Sub-task  (was: Task)

> Update log-start-offset after leader election for topics enabled with remote 
> storage
> 
>
> Key: KAFKA-15351
> URL: https://issues.apache.org/jira/browse/KAFKA-15351
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> In the FETCH response, the leader-log-start-offset will be piggy-backed. But, 
> there can be a scenario:
>  # Leader deleted the remote log segment and updates it's log-start-offset
>  # Before the replica-2 update it's log-start-offset via FETCH-request, the 
> leadership changed to replica-2.
>  # There are no more eligible segments to delete from remote.
>  # The log-start-offset will be stale (referring to old log-start-offset but 
> the data was already removed from remote)
>  # If the consumer starts to read from the beginning of the topic, it will 
> fail to read.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-15 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1295387916


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1033,6 +1360,35 @@ public void close() {
 }
 }
 
+private static class RetentionSizeData {
+private final long retentionSize;
+private final long remainingBreachedSize;
+
+public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+if (retentionSize < remainingBreachedSize) {
+throw new IllegalArgumentException("retentionSize must be 
greater than remainingBreachedSize");
+}

Review Comment:
   Good catch! It was changed while refactoring, added UTs to cover that in the 
latest commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15346) Single-Key_single-timestamp IQs with versioned state stores

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15346:

Component/s: streams

> Single-Key_single-timestamp IQs with versioned state stores
> ---
>
> Key: KAFKA-15346
> URL: https://issues.apache.org/jira/browse/KAFKA-15346
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just two query types:
> *Key Queries with single timestamp:*
>  # single-key latest-value lookup
>  # single-key lookup with timestamp (upper) bound



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15347:

Component/s: streams

> Single-Key_multi-timestamp IQs with versioned state stores
> --
>
> Key: KAFKA-15347
> URL: https://issues.apache.org/jira/browse/KAFKA-15347
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> [KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just four query types:
> *Key Queries with multiple timestamps:*
>  # single-key query with upper bound timestamp
>  # single-key query with lower bound timestamp
>  # single-key query with timestamp range
>  # single-key all versions query



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15348) Range IQs with versioned state stores

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15348:

Component/s: streams

> Range IQs with versioned state stores
> -
>
> Key: KAFKA-15348
> URL: https://issues.apache.org/jira/browse/KAFKA-15348
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> [KIP-969|https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers all types of range queries:
> *Range Queries*
>  # key-range latest-value query
>  # key-range with lower bound latest-value query
>  # key-range with upper bound latest-value query
>  # all-keys (no bound) latest-value query
>  # key-range query with timestamp (upper) bound
>  # key-range with lower bound with timestamp (upper) bound 
>  # key-range with upper bound with timestamp (upper) bound
>  # all-keys (no bound) with timestamp (upper) bound
>  # key-range query with timestamp range
>  # key-range query with lower bound with timestamp range
>  # key-range query with upper bound with timestamp range
>  # all-keys (no bound) with timestamp range
>  # key-range query all-versions
>  # key-range query with lower bound all-versions
>  # key-range query with upper bond all-versions
>  # all-keys query (no bound) all-versions (entire store)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295339433


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+val alterRequest = request.body[CreateDelegationTokenRequest]
+
+val requester = request.context.principal
+val ownerPrincipalName = alterRequest.data.ownerPrincipalName
+val ownerPrincipalType = alterRequest.data.ownerPrincipalType
+val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+  request.context.principal
+} else {
+  new KafkaPrincipal(ownerPrincipalType, ownerPrincipalName)
+}
+
+// Requester is always allowed to create token for self
+if (!owner.equals(requester) && 

Review Comment:
   I added the code. Please look over the added code. 
   
   One change is that I had to allow PLAINTEXT because all our broker to 
controller forwarding tests use PLAINTEXT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15329) Make default `remote.log.metadata.manager.class.name` as topic based RLMM

2023-08-15 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15329.
---
Resolution: Fixed

> Make default `remote.log.metadata.manager.class.name` as topic based RLMM
> -
>
> Key: KAFKA-15329
> URL: https://issues.apache.org/jira/browse/KAFKA-15329
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Priority: Blocker
> Fix For: 3.6.0
>
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs.1]
>  , we should set default "remote.log.metadata.manager.class.name" as topic 
> based RLMM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #14202: KAFKA-15329: Make default remote.log.metadata.manager.class.name as topic based RLMM

2023-08-15 Thread via GitHub


showuon merged PR #14202:
URL: https://github.com/apache/kafka/pull/14202


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14202: KAFKA-15329: Make default remote.log.metadata.manager.class.name as topic based RLMM

2023-08-15 Thread via GitHub


showuon commented on PR #14202:
URL: https://github.com/apache/kafka/pull/14202#issuecomment-1679845654

   Failed tests are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 17 and Scala 2.13 / 
kafka.api.SslAdminIntegrationTest.testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads()
   Build / JDK 17 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 20 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.PlaintextAdminIntegrationTest.testElectPreferredLeaders(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on PR #14083:
URL: https://github.com/apache/kafka/pull/14083#issuecomment-1679832173

   > I think this starts to look good. There are some parts I haven't reviewed 
yet, I'll try to get them in the next few days.
   
   Thank you for the review comments. I think I have addressed all the ones you 
have given.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295289182


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -186,57 +134,28 @@ class DelegationTokenManager(val config: KafkaConfig,
   val tokenMaxLifetime: Long = config.delegationTokenMaxLifeMs
   val defaultTokenRenewTime: Long = config.delegationTokenExpiryTimeMs
   val tokenRemoverScanInterval: Long = 
config.delegationTokenExpiryCheckIntervalMs
-  private val lock = new Object()
-  private var tokenChangeListener: ZkNodeChangeNotificationListener = _
 
   def startup(): Unit = {
 if (config.tokenAuthEnabled) {
-  zkClient.createDelegationTokenPaths()
   loadCache()
-  tokenChangeListener = new ZkNodeChangeNotificationListener(zkClient, 
DelegationTokenChangeNotificationZNode.path, 
DelegationTokenChangeNotificationSequenceZNode.SequenceNumberPrefix, 
TokenChangedNotificationHandler)
-  tokenChangeListener.init()
 }
   }
 
   def shutdown(): Unit = {
 if (config.tokenAuthEnabled) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295288501


##
core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala:
##
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.server.DelegationTokenManager
+import kafka.server.KafkaConfig
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.server.fault.FaultHandler
+
+
+class DelegationTokenPublisher(
+  conf: KafkaConfig,
+  faultHandler: FaultHandler,
+  nodeType: String,
+  tokenManager: DelegationTokenManager,
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+  logIdent = s"[${name()}] "
+
+  var _firstPublish = true
+
+  override def name(): String = s"DelegationTokenPublisher ${nodeType} 
id=${conf.nodeId}"
+
+  override def onMetadataUpdate(
+delta: MetadataDelta,
+newImage: MetadataImage,
+manifest: LoaderManifest
+  ): Unit = {
+onMetadataUpdate(delta, newImage)
+  }
+
+  def onMetadataUpdate(
+delta: MetadataDelta,
+newImage: MetadataImage,
+  ): Unit = {
+val deltaName = if (_firstPublish) {
+  s"initial MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
+} else {
+  s"update MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
+}
+try {
+  if (_firstPublish) {
+// Initialize the tokenCache with the Image
+Option(newImage.delegationTokens()).foreach { delegationTokenImage =>
+  delegationTokenImage.tokens().forEach { (tokenId, 
delegationTokenData) =>
+
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation()))
+  }
+}
+_firstPublish = false
+  }
+  // Apply changes to DelegationTokens.
+  Option(delta.delegationTokenDelta()).foreach { delegationTokenDelta =>
+delegationTokenDelta.changes().forEach { 
+  case (tokenId, delegationTokenData) => 

Review Comment:
   > nit: `tokenId` isn't used, you can just replace it with `_`
   
   It's used in the else clause when we remove the token.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on PR #14083:
URL: https://github.com/apache/kafka/pull/14083#issuecomment-1679827453

   > @pprovenzano Thanks for the PR. left few comments
   > 
   > can we also update delegation token docs if required (like any configs for 
controller nodes etc..) 
https://github.com/apache/kafka/blob/trunk/docs/security.html#L1178
   
   I'm going to do that with a separate PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14220: KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2

2023-08-15 Thread via GitHub


gharris1727 commented on code in PR #14220:
URL: https://github.com/apache/kafka/pull/14220#discussion_r1295265105


##
docs/upgrade.html:
##
@@ -43,6 +43,10 @@ Notable changes in 3
 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams;>KIP-925
 and Kafka
 Streams Developer Guide for more details.
 
+To account for a break in compatibility introduced in version 
3.1.0, MirrorMaker 2 has added a new
+replication.policy.internal.topic.separator.enabled
+property. If upgrading from 3.0.x or earlier, it may be necessary 
to set this property to false; see the property's
+documentation
 for more details.

Review Comment:
   It does look like the other items in this section include KIP links, but in 
general there aren't too many KIP links in the upgrade section. I think linking 
the KIP is unnecessary.



##
docs/upgrade.html:
##
@@ -43,6 +43,10 @@ Notable changes in 3
 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams;>KIP-925
 and Kafka
 Streams Developer Guide for more details.
 
+To account for a break in compatibility introduced in version 
3.1.0, MirrorMaker 2 has added a new
+replication.policy.internal.topic.separator.enabled
+property. If upgrading from 3.0.x or earlier, it may be necessary 
to set this property to false; see the property's
+documentation
 for more details.

Review Comment:
   This second link feels redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14220: KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2

2023-08-15 Thread via GitHub


C0urante commented on code in PR #14220:
URL: https://github.com/apache/kafka/pull/14220#discussion_r1295254068


##
docs/upgrade.html:
##
@@ -43,6 +43,10 @@ Notable changes in 3
 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams;>KIP-925
 and Kafka
 Streams Developer Guide for more details.
 
+To account for a break in compatibility introduced in version 
3.1.0, MirrorMaker 2 has added a new
+replication.policy.internal.topic.separator.enabled
+property. If upgrading from 3.0.x or earlier, it may be necessary 
to set this property to false; see the property's
+documentation
 for more details.

Review Comment:
   We can also link to 
[KIP-949](https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy)
 here, but I'm hoping that the information provided here and in the property's 
docstring should be sufficient to save people the time and effort of reading 
through an entire KIP instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14220: KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2

2023-08-15 Thread via GitHub


C0urante commented on PR #14220:
URL: https://github.com/apache/kafka/pull/14220#issuecomment-1679781965

   @gharris1727 @mimaison as the other two committers who voted for KIP-949, 
would you mind taking a look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #14220: KAFKA-15102: Add release notes about the replication.policy.internal.topic.separator.enabled property for MirrorMaker 2

2023-08-15 Thread via GitHub


C0urante opened a new pull request, #14220:
URL: https://github.com/apache/kafka/pull/14220

   [Jira](https://issues.apache.org/jira/browse/KAFKA-15102)
   
   This is a docs-only follow-up for https://github.com/apache/kafka/pull/14082 
that calls out the necessity for this property in our release notes so that 
users upgrading MM2 from older versions can do so safely.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #14082: KAFKA-15102: Mirror Maker 2 - KIP690 backward compatibility

2023-08-15 Thread via GitHub


C0urante merged PR #14082:
URL: https://github.com/apache/kafka/pull/14082


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


mumrah commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295237042


##
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java:
##
@@ -66,6 +119,10 @@ public LogDeltaManifest(
 this.numBytes = numBytes;
 }
 
+public static Builder newBuilder() {

Review Comment:
   Well, it also has a public no-arg constructor, I just like the static 
factory. I do like the fluent style for builders and a static method to start 
it off makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295230735


##
metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java:
##


Review Comment:
   Yes and Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295230635


##
metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java:
##


Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15350) MetadataLoaderMetrics has ClassNotFoundException in system tests

2023-08-15 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-15350.
-
Resolution: Invalid

I believe this was an environmental problem, due to some stale artifacts. A 
clean build fixed the issues I was seeing.

> MetadataLoaderMetrics has ClassNotFoundException in system tests
> 
>
> Key: KAFKA-15350
> URL: https://issues.apache.org/jira/browse/KAFKA-15350
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, system tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Blocker
>
> The system tests appear to be failing on trunk with:
> {noformat}
> [2023-08-15 22:11:34,235] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/image/loader/MetadataLoaderMetrics
>at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:68)
>at kafka.Kafka$.buildServer(Kafka.scala:83)
>at kafka.Kafka$.main(Kafka.scala:91)
>at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.image.loader.MetadataLoaderMetrics
>at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
>at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>... 4 more{noformat}
> This happens with the `tests/kafkatest/tests/connect/`, 
> `tests/kafkatest/tests/core/kraft_upgrade_test.py` and possibly others.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15350) MetadataLoaderMetrics has ClassNotFoundException in system tests

2023-08-15 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15350:

Issue Type: Bug  (was: Improvement)

> MetadataLoaderMetrics has ClassNotFoundException in system tests
> 
>
> Key: KAFKA-15350
> URL: https://issues.apache.org/jira/browse/KAFKA-15350
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, system tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Blocker
>
> The system tests appear to be failing on trunk with:
> {noformat}
> [2023-08-15 22:11:34,235] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/image/loader/MetadataLoaderMetrics
>at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:68)
>at kafka.Kafka$.buildServer(Kafka.scala:83)
>at kafka.Kafka$.main(Kafka.scala:91)
>at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.image.loader.MetadataLoaderMetrics
>at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
>at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>... 4 more{noformat}
> This happens with the `tests/kafkatest/tests/connect/`, 
> `tests/kafkatest/tests/core/kraft_upgrade_test.py` and possibly others.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15336) Connect plugin Javadocs should mention serviceloader manifests

2023-08-15 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15336:

Issue Type: Improvement  (was: Bug)

> Connect plugin Javadocs should mention serviceloader manifests
> --
>
> Key: KAFKA-15336
> URL: https://issues.apache.org/jira/browse/KAFKA-15336
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.6.0
>
>
> Similar to the ConfigProvider, the Javadocs for the Connect plugin classes 
> should mention that plugin implementations should have ServiceLoader 
> manifests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15336) Connect plugin Javadocs should mention serviceloader manifests

2023-08-15 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15336:

Issue Type: Bug  (was: Improvement)

> Connect plugin Javadocs should mention serviceloader manifests
> --
>
> Key: KAFKA-15336
> URL: https://issues.apache.org/jira/browse/KAFKA-15336
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.6.0
>
>
> Similar to the ConfigProvider, the Javadocs for the Connect plugin classes 
> should mention that plugin implementations should have ServiceLoader 
> manifests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15350) MetadataLoaderMetrics has ClassNotFoundException in system tests

2023-08-15 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15350:
---

 Summary: MetadataLoaderMetrics has ClassNotFoundException in 
system tests
 Key: KAFKA-15350
 URL: https://issues.apache.org/jira/browse/KAFKA-15350
 Project: Kafka
  Issue Type: Improvement
  Components: kraft, system tests
Affects Versions: 3.6.0
Reporter: Greg Harris


The system tests appear to be failing on trunk with:
{noformat}
[2023-08-15 22:11:34,235] ERROR Exiting Kafka due to fatal exception 
(kafka.Kafka$)
java.lang.NoClassDefFoundError: 
org/apache/kafka/image/loader/MetadataLoaderMetrics
   at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:68)
   at kafka.Kafka$.buildServer(Kafka.scala:83)
   at kafka.Kafka$.main(Kafka.scala:91)
   at kafka.Kafka.main(Kafka.scala)
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.image.loader.MetadataLoaderMetrics
   at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
   ... 4 more{noformat}
This happens with the `tests/kafkatest/tests/connect/`, 
`tests/kafkatest/tests/core/kraft_upgrade_test.py` and possibly others.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-15 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1295198962


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ * The order of priority of properties during the assignment will be: balance 
> rack matching (when applicable) > stickiness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
current owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final RackInfo rackInfo;
+// Count of members to receive an extra partition beyond the minimum quota,
+// to account for the distribution of the remaining partitions.
+private int remainingMembersToGetExtraPartition;
+// Map of members to the remaining number of partitions needed to meet the 
minimum quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the remaining number of partitions needed to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the current owner of each partition when using rack-aware 
strategy.
+// Current refers to the existing assignment.
+private final Map currentPartitionOwners;
+// Indicates if a rack aware assignment can be done.
+// True if racks are defined for both members and partitions.
+boolean useRackAwareStrategy;
+
+OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.assignmentSpec = assignmentSpec;
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+subscriptionList = new 
ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+
+RackInfo rackInfo = new RackInfo(assignmentSpec, 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-08-15 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1295197016


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ * The order of priority of properties during the assignment will be: balance 
> rack matching (when applicable) > stickiness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
current owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final RackInfo rackInfo;
+// Count of members to receive an extra partition beyond the minimum quota,
+// to account for the distribution of the remaining partitions.
+private int remainingMembersToGetExtraPartition;
+// Map of members to the remaining number of partitions needed to meet the 
minimum quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the remaining number of partitions needed to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the current owner of each partition when using rack-aware 
strategy.
+// Current refers to the existing assignment.
+private final Map currentPartitionOwners;
+// Indicates if a rack aware assignment can be done.
+// True if racks are defined for both members and partitions.
+boolean useRackAwareStrategy;
+
+OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.assignmentSpec = assignmentSpec;
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+subscriptionList = new 
ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+
+RackInfo rackInfo = new RackInfo(assignmentSpec, 

[GitHub] [kafka] lucasbru commented on pull request #14216: KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434

2023-08-15 Thread via GitHub


lucasbru commented on PR #14216:
URL: https://github.com/apache/kafka/pull/14216#issuecomment-1679690414

   
https://jenkins.confluent.io/job/confluentinc/job/kafka-streams-benchmarks/job/master/653/parameters/
   
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5796/parameters/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao merged pull request #14179: MINOR: CommitRequestManager should only poll when the coordinator node is known

2023-08-15 Thread via GitHub


junrao merged PR #14179:
URL: https://github.com/apache/kafka/pull/14179


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15349) ducker-ak should fail fast when gradlew systemTestLibs fails

2023-08-15 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15349:
---

 Summary: ducker-ak should fail fast when gradlew systemTestLibs 
fails
 Key: KAFKA-15349
 URL: https://issues.apache.org/jira/browse/KAFKA-15349
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Greg Harris


If you introduce a flaw into the gradle build which causes the systemTestLibs 
to fail, such as a circular dependency, then the ducker_test function continues 
to run tests which are invalid.

Rather than proceeding to run the tests, the script should fail fast and make 
the user address the error before continuing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295140057


##
metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java:
##


Review Comment:
   It is because the compare is using the toString() which redacts sensitive 
data and I didn't remove any of the delegation tokens between IMAGE1 and IMAGE2 
so all the keys match. Once I add a RemoveDelegationTokenRecord in the test, 
that should cause it to fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #14177: MINOR: Fix SynchronizationTest Classloaders sometimes not being parallel capable

2023-08-15 Thread via GitHub


gharris1727 merged PR #14177:
URL: https://github.com/apache/kafka/pull/14177


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #14213: KAFKA-15345; KRaft leader notifies when listener reaches epoch start

2023-08-15 Thread via GitHub


cmccabe commented on PR #14213:
URL: https://github.com/apache/kafka/pull/14213#issuecomment-1679620704

   > The RecordsBatchReader implementation is also changed to include control 
records. This makes it possible for the state machine learn about committed 
control records. This additional information can be used to compute the 
committed offset or for counting those bytes when determining when to snapshot 
the partition.
   
   I think we should do this change separately, since it may require downstream 
changes. I think a few implementations assume they don't get "empty" batches 
(that don't have records), which would no longer be true if control record 
batches were sent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14177: MINOR: Fix SynchronizationTest Classloaders sometimes not being parallel capable

2023-08-15 Thread via GitHub


gharris1727 commented on PR #14177:
URL: https://github.com/apache/kafka/pull/14177#issuecomment-1679635472

   CI failures appear unrelated, and the tests pass locally. The test being 
fixed shows no failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15343) Fix MirrorConnectIntegrationTests causing ci build failures.

2023-08-15 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17754771#comment-17754771
 ] 

Greg Harris commented on KAFKA-15343:
-

Hi [~prasanth] and thank you for reporting this issue! It is certainly not good 
that one test can cause the whole build to fail, preventing other tests from 
running.

Can you speak to the frequency that you've seen this failure? Naively I would 
expect that with >1 ephemeral ports available, that such a failure would be 
quite rare.

If this is true, I don't think it is appropriate to disable these tests. They 
are extremely important test coverage for the MirrorMaker2 feature, and 
disabling them may lead to undetected regressions.

As far as resolving this issue, I think we should:

1. Find where we are leaking Kafka clients in the MM2 integration test suites, 
either within the framework or within the Mirror connectors.

2. Close Kafka clients in a timely fashion (some relevant work in 
https://issues.apache.org/jira/browse/KAFKA-14725 and 
https://issues.apache.org/jira/browse/KAFKA-15090 )

2. Try to reproduce the Gradle daemon crash in a more controlled environment

3. Report the daemon crash to the Gradle upstream



Since random port selection and port-reuse are standard procedures (not 
specific to Kafka) there could be downstream projects using Gradle that are 
affected. If there is something specific about the Kafka clients' connections 
that affect gradle, then we should investigate further to help the Gradle 
project resolve the issue.

> Fix MirrorConnectIntegrationTests causing ci build failures.
> 
>
> Key: KAFKA-15343
> URL: https://issues.apache.org/jira/browse/KAFKA-15343
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 3.6.0
>Reporter: Prasanth Kumar
>Priority: Major
>
> There are several instances of tests interacting badly with gradle daemon(s) 
> running on ports that the kafka broker previously used. After going through 
> the debug logs we observed a few retrying kafka clients trying to connect to 
> broker which got shutdown and the gradle worker chose the same port on which 
> broker was running. Later in the build, the gradle daemon attempted to 
> connect to the worker and could not, triggering a failure. Ideally gradle 
> would not exit when connected to from an invalid client - in testing with 
> netcat, it would often handle these without dying. However there appear to be 
> some cases where the daemon dies completely. Both the broker code and the 
> gradle workers bind to port 0, resulting in the OS assigning it an unused 
> port. This does avoid conflicts, but does not ensure that long lived clients 
> do not attempt to connect to these ports afterwards. It's possible that 
> closing the client in between may be enough to work around this issue. Till 
> then we will disable the test to avoid the ci blocker from testing the code 
> changes.
> *MirrorConnectorsIntegrationBaseTest and extending Tests*
> {code:java}
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] 
> MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 
> testReplicateSourceDefault() STANDARD_OUT
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] [2023-07-04 11:47:46,799]
>  INFO primary REST service: http://localhost:43809/connectors 
> (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:224)
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] [2023-07-04 11:47:46,799] 
> INFO backup REST service: http://localhost:43323/connectors 
> (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:225)
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] [2023-07-04 11:47:46,799] 
> INFO primary brokers: localhost:37557 
> (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:226)
> [2023-07-04T11:59:12.968Z] 2023-07-04T11:59:12.900+ [DEBUG] 
> [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] 
> Accepted connection from /127.0.0.1:47660 to /127.0.0.1:37557.
> [2023-07-04T11:59:13.233Z] 
> org.gradle.internal.remote.internal.MessageIOException: Could not read 
> message from '/127.0.0.1:47660'.
> [2023-07-04T11:59:12.970Z] 2023-07-04T11:59:12.579+ [DEBUG] 
> [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] Listening on 
> [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, 
> addresses:[localhost/127.0.0.1]].
> [2023-07-04T11:59:46.519Z] 2023-07-04T11:59:13.014+ [ERROR] 
> [system.err] org.gradle.internal.remote.internal.ConnectException: Could not 
> connect to server [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, 
> 

[GitHub] [kafka] jeqo opened a new pull request, #14219: KIP-405: 2023-08-15

2023-08-15 Thread via GitHub


jeqo opened a new pull request, #14219:
URL: https://github.com/apache/kafka/pull/14219

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077023


##
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java:
##
@@ -80,6 +137,7 @@ public LeaderAndEpoch leaderAndEpoch() {
 return leaderAndEpoch;
 }
 
+

Review Comment:
   whitespace not needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on PR #14208:
URL: https://github.com/apache/kafka/pull/14208#issuecomment-1679586281

   Thanks for the PR, @mumrah . I think at some point we'll need to add 
begin/end transactions to the set of things we "fuzz". Basically have a test 
that just makes sure that the MetadataLoader does the right thing no matter how 
many transaction markers are where. That doesn't need to be in this PR though
   
   Thanks also for the fix to QC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo closed pull request #14219: KIP-405: 2023-08-15

2023-08-15 Thread via GitHub


jeqo closed pull request #14219: KIP-405: 2023-08-15
URL: https://github.com/apache/kafka/pull/14219


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295076740


##
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java:
##
@@ -27,6 +27,59 @@
  * Contains information about a set of changes that were loaded from the 
metadata log.
  */
 public class LogDeltaManifest implements LoaderManifest {
+
+public static class Builder {
+private MetadataProvenance provenance;
+private LeaderAndEpoch leaderAndEpoch;
+private Integer numBatches;

Review Comment:
   seems a bit weird to use boxed primitives. It's quite inefficient in Java.
   
   If you want them to start with invalid values, just make them negative (that 
works for numBatches, elapsedNs, numBytes ... negative values for those don't 
make sense.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295091800


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Loads batches of metadata updates from Raft commits into MetadataDelta-s. 
Multiple batches from a commit
+ * are buffered into a MetadataDelta to achieve batching of records and reduce 
the number of times
+ * MetadataPublishers must be updated. This class also supports metadata 
transactions (KIP-866).
+ *
+ *
+ */
+public class MetadataBatchLoader {
+
+enum TransactionState {
+NO_TRANSACTION,
+STARTED_TRANSACTION,
+CONTINUED_TRANSACTION,
+ENDED_TRANSACTION,
+ABORTED_TRANSACTION;
+}
+
+@FunctionalInterface
+public interface MetadataUpdater {
+void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest 
manifest);
+}
+
+private final Logger log;
+private final Time time;
+private final FaultHandler faultHandler;
+private final Supplier leaderAndEpochSupplier;
+private final MetadataUpdater callback;
+
+private MetadataImage image;
+private MetadataDelta delta;
+private long lastOffset;
+private int lastEpoch;
+private long lastContainedLogTimeMs;
+private long numBytes;
+private int numBatches;
+private long totalBatchElapsedNs;
+private TransactionState transactionState;
+
+public MetadataBatchLoader(
+LogContext logContext,
+Time time,
+FaultHandler faultHandler,
+Supplier leaderAndEpochSupplier,
+MetadataUpdater callback
+) {
+this.log = logContext.logger(MetadataBatchLoader.class);
+this.time = time;
+this.faultHandler = faultHandler;
+this.leaderAndEpochSupplier = leaderAndEpochSupplier;
+this.callback = callback;
+}
+
+/**
+ * Reset the state of this batch loader to the given image. Any un-flushed 
state will be
+ * discarded.
+ *
+ * @param image Metadata image to reset this batch loader's state to.
+ */
+public void resetToImage(MetadataImage image) {
+this.image = image;
+this.delta = new MetadataDelta.Builder().setImage(image).build();
+this.transactionState = TransactionState.NO_TRANSACTION;
+this.lastOffset = image.provenance().lastContainedOffset();
+this.lastEpoch = image.provenance().lastContainedEpoch();
+this.lastContainedLogTimeMs = 
image.provenance().lastContainedLogTimeMs();
+this.numBytes = 0;
+this.numBatches = 0;
+this.totalBatchElapsedNs = 0;
+}
+
+/**
+ * Load a batch of records from the log. We have to do some bookkeeping 
here to
+ * translate between batch offsets and record offsets, and track the 
number of bytes we
+ * have read. Additionally, there is the chance that one of the records is 
a metadata
+ * version change which needs to be handled differently.
+ * 
+ * If this batch starts a transaction, any records preceding the 
transaction in this
+ * batch will be implicitly added to the transaction.
+ *
+ * @param batchThe reader which yields the batches.
+ * @return The time in nanoseconds that elapsed while loading this 
batch
+ */
+
+public long loadBatch(Batch batch) {
+long startNs = time.nanoseconds();
+int indexWithinBatch = 0;
+
+lastContainedLogTimeMs = 

[GitHub] [kafka] lianetm commented on a diff in pull request #14218: KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate

2023-08-15 Thread via GitHub


lianetm commented on code in PR #14218:
URL: https://github.com/apache/kafka/pull/14218#discussion_r1295078074


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -134,44 +132,20 @@ public static FetchMetricsManager 
createFetchMetricsManager(Metrics metrics) {
 }
 
 public static  FetchConfig createFetchConfig(ConsumerConfig 
config,
- Deserializer 
keyDeserializer,
- Deserializer 
valueDeserializer) {
-IsolationLevel isolationLevel = createIsolationLevel(config);
-return new FetchConfig<>(config, keyDeserializer, valueDeserializer, 
isolationLevel);
+ Deserializers deserializers) {
+IsolationLevel isolationLevel = getConfiguredIsolationLevel(config);
+return new FetchConfig<>(config, deserializers, isolationLevel);
 }
 
-@SuppressWarnings("unchecked")
-public static  List> 
createConsumerInterceptors(ConsumerConfig config) {
-return ClientUtils.createConfiguredInterceptors(config,
-ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
-ConsumerInterceptor.class);
+public static FetchConfig createFetchConfig(ConsumerConfig 
config) {
+Deserializers deserializers = new Deserializers<>(new 
StringDeserializer(), new StringDeserializer());

Review Comment:
   Sure, done. This was initially added as part of these changes though (and 
not as part of the FetchRequestManager PR), so we'll have to take care of that 
when cherry-picking the FetchRequestManager commit but that's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295084331


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Loads batches of metadata updates from Raft commits into MetadataDelta-s. 
Multiple batches from a commit
+ * are buffered into a MetadataDelta to achieve batching of records and reduce 
the number of times
+ * MetadataPublishers must be updated. This class also supports metadata 
transactions (KIP-866).
+ *
+ *
+ */
+public class MetadataBatchLoader {
+
+enum TransactionState {
+NO_TRANSACTION,
+STARTED_TRANSACTION,
+CONTINUED_TRANSACTION,
+ENDED_TRANSACTION,
+ABORTED_TRANSACTION;
+}
+
+@FunctionalInterface
+public interface MetadataUpdater {
+void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest 
manifest);
+}
+
+private final Logger log;
+private final Time time;
+private final FaultHandler faultHandler;
+private final Supplier leaderAndEpochSupplier;
+private final MetadataUpdater callback;
+
+private MetadataImage image;
+private MetadataDelta delta;
+private long lastOffset;
+private int lastEpoch;
+private long lastContainedLogTimeMs;
+private long numBytes;
+private int numBatches;
+private long totalBatchElapsedNs;
+private TransactionState transactionState;
+
+public MetadataBatchLoader(
+LogContext logContext,
+Time time,
+FaultHandler faultHandler,
+Supplier leaderAndEpochSupplier,
+MetadataUpdater callback
+) {
+this.log = logContext.logger(MetadataBatchLoader.class);
+this.time = time;
+this.faultHandler = faultHandler;
+this.leaderAndEpochSupplier = leaderAndEpochSupplier;
+this.callback = callback;
+}
+
+/**
+ * Reset the state of this batch loader to the given image. Any un-flushed 
state will be
+ * discarded.
+ *
+ * @param image Metadata image to reset this batch loader's state to.
+ */
+public void resetToImage(MetadataImage image) {
+this.image = image;
+this.delta = new MetadataDelta.Builder().setImage(image).build();
+this.transactionState = TransactionState.NO_TRANSACTION;
+this.lastOffset = image.provenance().lastContainedOffset();
+this.lastEpoch = image.provenance().lastContainedEpoch();
+this.lastContainedLogTimeMs = 
image.provenance().lastContainedLogTimeMs();
+this.numBytes = 0;
+this.numBatches = 0;
+this.totalBatchElapsedNs = 0;
+}
+
+/**
+ * Load a batch of records from the log. We have to do some bookkeeping 
here to
+ * translate between batch offsets and record offsets, and track the 
number of bytes we
+ * have read. Additionally, there is the chance that one of the records is 
a metadata
+ * version change which needs to be handled differently.
+ * 
+ * If this batch starts a transaction, any records preceding the 
transaction in this
+ * batch will be implicitly added to the transaction.
+ *
+ * @param batchThe reader which yields the batches.
+ * @return The time in nanoseconds that elapsed while loading this 
batch
+ */
+
+public long loadBatch(Batch batch) {
+long startNs = time.nanoseconds();
+int indexWithinBatch = 0;
+
+lastContainedLogTimeMs = 

[GitHub] [kafka] gharris1727 commented on pull request #14194: KAFKA-15336: Add ServiceLoader Javadocs for Connect plugins

2023-08-15 Thread via GitHub


gharris1727 commented on PR #14194:
URL: https://github.com/apache/kafka/pull/14194#issuecomment-1679551865

   This is a documentation-only change, and test failures appear unrelated. A 
local `javadoc` build doesn't include any errors for these classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295090413


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##
@@ -183,6 +183,8 @@ public MetadataLoader build() {
  */
 private MetadataImage image;
 
+private MetadataBatchLoader batchLoader;

Review Comment:
   it seems like in the current implementation, this is `final` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295084521


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Loads batches of metadata updates from Raft commits into MetadataDelta-s. 
Multiple batches from a commit
+ * are buffered into a MetadataDelta to achieve batching of records and reduce 
the number of times
+ * MetadataPublishers must be updated. This class also supports metadata 
transactions (KIP-866).
+ *
+ *
+ */
+public class MetadataBatchLoader {
+
+enum TransactionState {
+NO_TRANSACTION,
+STARTED_TRANSACTION,
+CONTINUED_TRANSACTION,
+ENDED_TRANSACTION,
+ABORTED_TRANSACTION;
+}
+
+@FunctionalInterface
+public interface MetadataUpdater {
+void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest 
manifest);
+}
+
+private final Logger log;
+private final Time time;
+private final FaultHandler faultHandler;
+private final Supplier leaderAndEpochSupplier;
+private final MetadataUpdater callback;
+
+private MetadataImage image;
+private MetadataDelta delta;
+private long lastOffset;
+private int lastEpoch;
+private long lastContainedLogTimeMs;
+private long numBytes;
+private int numBatches;
+private long totalBatchElapsedNs;
+private TransactionState transactionState;
+
+public MetadataBatchLoader(
+LogContext logContext,
+Time time,
+FaultHandler faultHandler,
+Supplier leaderAndEpochSupplier,
+MetadataUpdater callback
+) {
+this.log = logContext.logger(MetadataBatchLoader.class);
+this.time = time;
+this.faultHandler = faultHandler;
+this.leaderAndEpochSupplier = leaderAndEpochSupplier;
+this.callback = callback;
+}
+
+/**
+ * Reset the state of this batch loader to the given image. Any un-flushed 
state will be
+ * discarded.
+ *
+ * @param image Metadata image to reset this batch loader's state to.
+ */
+public void resetToImage(MetadataImage image) {
+this.image = image;
+this.delta = new MetadataDelta.Builder().setImage(image).build();
+this.transactionState = TransactionState.NO_TRANSACTION;
+this.lastOffset = image.provenance().lastContainedOffset();
+this.lastEpoch = image.provenance().lastContainedEpoch();
+this.lastContainedLogTimeMs = 
image.provenance().lastContainedLogTimeMs();
+this.numBytes = 0;
+this.numBatches = 0;
+this.totalBatchElapsedNs = 0;
+}
+
+/**
+ * Load a batch of records from the log. We have to do some bookkeeping 
here to
+ * translate between batch offsets and record offsets, and track the 
number of bytes we
+ * have read. Additionally, there is the chance that one of the records is 
a metadata
+ * version change which needs to be handled differently.
+ * 
+ * If this batch starts a transaction, any records preceding the 
transaction in this
+ * batch will be implicitly added to the transaction.
+ *
+ * @param batchThe reader which yields the batches.
+ * @return The time in nanoseconds that elapsed while loading this 
batch
+ */
+
+public long loadBatch(Batch batch) {
+long startNs = time.nanoseconds();
+int indexWithinBatch = 0;
+
+lastContainedLogTimeMs = 

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077023


##
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java:
##
@@ -80,6 +137,7 @@ public LeaderAndEpoch leaderAndEpoch() {
 return leaderAndEpoch;
 }
 
+

Review Comment:
   whitespace change not needed?



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -786,6 +798,7 @@ public void complete(Throwable exception) {
 }
 }
 
+

Review Comment:
   looks like extra whitespace?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295081634


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Loads batches of metadata updates from Raft commits into MetadataDelta-s. 
Multiple batches from a commit
+ * are buffered into a MetadataDelta to achieve batching of records and reduce 
the number of times
+ * MetadataPublishers must be updated. This class also supports metadata 
transactions (KIP-866).
+ *
+ *
+ */
+public class MetadataBatchLoader {
+
+enum TransactionState {
+NO_TRANSACTION,
+STARTED_TRANSACTION,
+CONTINUED_TRANSACTION,
+ENDED_TRANSACTION,
+ABORTED_TRANSACTION;
+}
+
+@FunctionalInterface
+public interface MetadataUpdater {
+void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest 
manifest);
+}
+
+private final Logger log;
+private final Time time;
+private final FaultHandler faultHandler;
+private final Supplier leaderAndEpochSupplier;
+private final MetadataUpdater callback;
+
+private MetadataImage image;
+private MetadataDelta delta;
+private long lastOffset;
+private int lastEpoch;
+private long lastContainedLogTimeMs;
+private long numBytes;
+private int numBatches;
+private long totalBatchElapsedNs;
+private TransactionState transactionState;
+
+public MetadataBatchLoader(
+LogContext logContext,
+Time time,
+FaultHandler faultHandler,
+Supplier leaderAndEpochSupplier,
+MetadataUpdater callback
+) {
+this.log = logContext.logger(MetadataBatchLoader.class);
+this.time = time;
+this.faultHandler = faultHandler;
+this.leaderAndEpochSupplier = leaderAndEpochSupplier;
+this.callback = callback;
+}
+
+/**
+ * Reset the state of this batch loader to the given image. Any un-flushed 
state will be
+ * discarded.
+ *
+ * @param image Metadata image to reset this batch loader's state to.
+ */
+public void resetToImage(MetadataImage image) {
+this.image = image;
+this.delta = new MetadataDelta.Builder().setImage(image).build();
+this.transactionState = TransactionState.NO_TRANSACTION;
+this.lastOffset = image.provenance().lastContainedOffset();
+this.lastEpoch = image.provenance().lastContainedEpoch();
+this.lastContainedLogTimeMs = 
image.provenance().lastContainedLogTimeMs();
+this.numBytes = 0;
+this.numBatches = 0;
+this.totalBatchElapsedNs = 0;
+}
+
+/**
+ * Load a batch of records from the log. We have to do some bookkeeping 
here to
+ * translate between batch offsets and record offsets, and track the 
number of bytes we
+ * have read. Additionally, there is the chance that one of the records is 
a metadata
+ * version change which needs to be handled differently.
+ * 
+ * If this batch starts a transaction, any records preceding the 
transaction in this
+ * batch will be implicitly added to the transaction.
+ *
+ * @param batchThe reader which yields the batches.
+ * @return The time in nanoseconds that elapsed while loading this 
batch
+ */
+
+public long loadBatch(Batch batch) {
+long startNs = time.nanoseconds();
+int indexWithinBatch = 0;
+
+lastContainedLogTimeMs = 

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077448


##
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java:
##
@@ -66,6 +119,10 @@ public LogDeltaManifest(
 this.numBytes = numBytes;
 }
 
+public static Builder newBuilder() {

Review Comment:
   curious why this is better than just having a public Builder constructor (I 
don't feel strongly, I guess...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295074841


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1200,6 +1235,16 @@ public static List 
generateActivationRecords(
 throw new RuntimeException("Should not have ZK migrations 
enabled on a cluster running metadata.version " + 
featureControl.metadataVersion());
 }
 }
+
+if (inTransaction) {
+if 
(!featureControl.metadataVersion().isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected in-progress 
transaction, but the metadata.version " + featureControl.metadataVersion() +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected in-progress transaction during 
controller activation. Aborting this transaction.");

Review Comment:
   I think we should include the start transaction offset in the log message 
(if you want, pass an OptionalLong to this function rather than a boolean?) 
That helps match up the beginning of the transaction with where it ends.
   
   Also should fill in that "reason" field, presumably with something like 
"controller failover"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295072484


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +909,62 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
+public CompletableFuture beginMigration() {
 log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK Migration"), 
(short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");
-// TODO use KIP-868 transaction
-ControllerWriteEvent event = new 
ControllerWriteEvent<>("Complete ZK Migration",
+ControllerWriteEvent event = new ControllerWriteEvent<>(
+"Complete ZK Migration",
 new MigrationWriteOperation(
-
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
-EnumSet.of(RUNS_IN_PREMIGRATION));
+Arrays.asList(
+ZkMigrationState.MIGRATION.toRecord(),
+new ApiMessageAndVersion(
+new EndTransactionRecord(), (short) 0)
+)),
+eventFlags);
 queue.append(event);
 return event.future.thenApply(__ -> highestMigrationRecordOffset);
 }
 
 @Override
-public void abortMigration() {
+public CompletableFuture abortMigration() {
 fatalFaultHandler.handleFault("Aborting the ZK migration");
-// TODO use KIP-868 transaction
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(

Review Comment:
   check mv
   
   Set the "reason" field to "aborting ZK migration" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #14194: KAFKA-15336: Add ServiceLoader Javadocs for Connect plugins

2023-08-15 Thread via GitHub


gharris1727 merged PR #14194:
URL: https://github.com/apache/kafka/pull/14194


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295073210


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -954,11 +985,14 @@ public void 
handleCommit(BatchReader reader) {
 // so we don't need to do it here.
 log.debug("Completing purgatory items up to offset 
{} and epoch {}.", offset, epoch);
 
-// Complete any events in the purgatory that were 
waiting for this offset.
-deferredEventQueue.completeUpTo(offset);
+// Advance the committed and stable offsets then 
complete any pending purgatory
+// items that were waiting for these offsets.
+offsetControl.handleCommitBatch(batch);
+
deferredEventQueue.completeUpTo(offsetControl.lastStableOffset());
+
deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset());
 
 // The active controller can delete up to the 
current committed offset.
-snapshotRegistry.deleteSnapshotsUpTo(offset);
+
snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset());

Review Comment:
   I guess we need to call this one out as a bug fix.
   
   Good find.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295069992


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -623,7 +624,14 @@ enum ControllerOperationFlag {
  * even though the cluster really does have metadata. Very few 
operations should
  * use this flag.
  */
-RUNS_IN_PREMIGRATION
+RUNS_IN_PREMIGRATION,
+
+/**
+ * This flag signifies that an event will be completed even if it is 
part of an unfinished transaction.
+ * This is needed for metadata transactions so that external callers 
can add records to a transaction

Review Comment:
   Maybe mention ZK migration records as an example ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295071752


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +909,62 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
+public CompletableFuture beginMigration() {
 log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK Migration"), 
(short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");
-// TODO use KIP-868 transaction
-ControllerWriteEvent event = new 
ControllerWriteEvent<>("Complete ZK Migration",
+ControllerWriteEvent event = new ControllerWriteEvent<>(
+"Complete ZK Migration",
 new MigrationWriteOperation(
-
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
-EnumSet.of(RUNS_IN_PREMIGRATION));
+Arrays.asList(

Review Comment:
   This is another case where we have to check the MV I guess



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14218: KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate

2023-08-15 Thread via GitHub


lianetm commented on code in PR #14218:
URL: https://github.com/apache/kafka/pull/14218#discussion_r1295049009


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -134,44 +132,20 @@ public static FetchMetricsManager 
createFetchMetricsManager(Metrics metrics) {
 }
 
 public static  FetchConfig createFetchConfig(ConsumerConfig 
config,
- Deserializer 
keyDeserializer,
- Deserializer 
valueDeserializer) {
-IsolationLevel isolationLevel = createIsolationLevel(config);
-return new FetchConfig<>(config, keyDeserializer, valueDeserializer, 
isolationLevel);
+ Deserializers deserializers) {
+IsolationLevel isolationLevel = getConfiguredIsolationLevel(config);
+return new FetchConfig<>(config, deserializers, isolationLevel);
 }
 
-@SuppressWarnings("unchecked")
-public static  List> 
createConsumerInterceptors(ConsumerConfig config) {
-return ClientUtils.createConfiguredInterceptors(config,
-ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
-ConsumerInterceptor.class);
+public static FetchConfig createFetchConfig(ConsumerConfig 
config) {
+Deserializers deserializers = new Deserializers<>(new 
StringDeserializer(), new StringDeserializer());

Review Comment:
   Fixed. Just for the record, this is unused here but will be used in 
following PR with FetchRequestManager changes. 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -134,44 +132,20 @@ public static FetchMetricsManager 
createFetchMetricsManager(Metrics metrics) {
 }
 
 public static  FetchConfig createFetchConfig(ConsumerConfig 
config,
- Deserializer 
keyDeserializer,
- Deserializer 
valueDeserializer) {
-IsolationLevel isolationLevel = createIsolationLevel(config);
-return new FetchConfig<>(config, keyDeserializer, valueDeserializer, 
isolationLevel);
+ Deserializers deserializers) {
+IsolationLevel isolationLevel = getConfiguredIsolationLevel(config);
+return new FetchConfig<>(config, deserializers, isolationLevel);
 }
 
-@SuppressWarnings("unchecked")
-public static  List> 
createConsumerInterceptors(ConsumerConfig config) {
-return ClientUtils.createConfiguredInterceptors(config,
-ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
-ConsumerInterceptor.class);
+public static FetchConfig createFetchConfig(ConsumerConfig 
config) {
+Deserializers deserializers = new Deserializers<>(new 
StringDeserializer(), new StringDeserializer());

Review Comment:
   You're right, I don't think it is. This was defined like this in the commit 
we are cherry-picking here, that's why it ended up here, but I'll include the 
latest changes with the fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14218: KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate

2023-08-15 Thread via GitHub


lianetm commented on code in PR #14218:
URL: https://github.com/apache/kafka/pull/14218#discussion_r1295036974


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -134,44 +132,20 @@ public static FetchMetricsManager 
createFetchMetricsManager(Metrics metrics) {
 }
 
 public static  FetchConfig createFetchConfig(ConsumerConfig 
config,
- Deserializer 
keyDeserializer,
- Deserializer 
valueDeserializer) {
-IsolationLevel isolationLevel = createIsolationLevel(config);
-return new FetchConfig<>(config, keyDeserializer, valueDeserializer, 
isolationLevel);
+ Deserializers deserializers) {
+IsolationLevel isolationLevel = getConfiguredIsolationLevel(config);
+return new FetchConfig<>(config, deserializers, isolationLevel);
 }
 
-@SuppressWarnings("unchecked")
-public static  List> 
createConsumerInterceptors(ConsumerConfig config) {
-return ClientUtils.createConfiguredInterceptors(config,
-ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
-ConsumerInterceptor.class);
+public static FetchConfig createFetchConfig(ConsumerConfig 
config) {
+Deserializers deserializers = new Deserializers<>(new 
StringDeserializer(), new StringDeserializer());

Review Comment:
   You're right, I don't think it is. This was defined like this in the commit 
we are cherry-picking though, that's why it ended up here, but I'll include the 
latest changes with the fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-15 Thread via GitHub


AndrewJSchofield commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1295024473


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -114,18 +125,39 @@ public synchronized Cluster fetch() {
 
 /**
  * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+ * There are two calculations for backing off based on how many attempts 
to retrieve metadata have been made
+ * since the last successful response, and how many equivalent metadata 
responses have been received.
+ * The second of these allows backing off when there are errors to do with 
stale metadata, even though the
+ * metadata responses are clean.
+ * 
+ * This can be used to check whether it's worth requesting an update in 
the knowledge that it will
+ * not be delayed if this method returns 0.
  *
  * @param nowMs current time in ms
  * @return remaining time in ms till the cluster info can be updated again
  */
 public synchronized long timeToAllowUpdate(long nowMs) {
-return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+// Calculate the backoff for attempts which acts when metadata 
responses fail
+long backoffForAttempts = Math.max(this.lastRefreshMs +
+this.refreshBackoff.backoff(this.attempts > 0 ? this.attempts 
- 1 : 0) - nowMs, 0);
+
+// Periodic updates based on expiration are not backed off based on 
equivalent responses
+long backoffForEquivalentResponseCount;
+if (Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - 
nowMs, 0) == 0) {

Review Comment:
   I've rearranged this slightly to reset `this.equivalentResponseCount` when 
the metadata expiration period has elapsed. This means that a sequence of 
periodic refreshes with unchanging metadata doesn't inexorably increase the 
delay (although that would anyway have been bounded by the max delay, which 
means by default the delay no longer increases after 4 iterations).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #14215: MINOR Install metadata publishers sooner in ControllerServer

2023-08-15 Thread via GitHub


cmccabe merged PR #14215:
URL: https://github.com/apache/kafka/pull/14215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #14199: MINOR Fix the ZkMigrationState metric in KafkaController

2023-08-15 Thread via GitHub


cmccabe closed pull request #14199: MINOR Fix the ZkMigrationState metric in 
KafkaController
URL: https://github.com/apache/kafka/pull/14199


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-15 Thread via GitHub


AndrewJSchofield commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1295030179


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -140,17 +172,34 @@ public long metadataExpireMs() {
 }
 
 /**
- * Request an update of the current cluster metadata info, return the 
current updateVersion before the update
+ * Request an update of the current cluster metadata info, permitting 
backoff based on the number of
+ * equivalent metadata responses, which indicates that responses did not 
make progress and may be stale.
+ * @param permitBackoffOnEquivalentResponses Whether to permit backoff 
when consecutive responses are equivalent.
+ *   This should be set to 
true in situations where the update is
+ *   being requested to retry an 
operation, such as when the leader has
+ *   changed. It should be set to 
false in situations where new
+ *   metadata is being requested, 
such as adding a topic to a subscription.
+ *   In situations where it's not 
clear, it's best to use false.
+ * @return The current updateVersion before the update
  */
-public synchronized int requestUpdate() {
+public synchronized int requestUpdate(final boolean 
permitBackoffOnEquivalentResponses) {
 this.needFullUpdate = true;
+if (!permitBackoffOnEquivalentResponses) {

Review Comment:
   I think the negation is a little clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #14199: MINOR Fix the ZkMigrationState metric in KafkaController

2023-08-15 Thread via GitHub


cmccabe commented on PR #14199:
URL: https://github.com/apache/kafka/pull/14199#issuecomment-1679483573

   LGTM, committed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-08-15 Thread via GitHub


ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1679474019

   @mimaison please, take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-15 Thread via GitHub


divijvaidya commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1294991302


##
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala:
##
@@ -154,6 +155,96 @@ class KafkaServerTest extends QuorumTestHarness {
 server.shutdown()
   }
 
+  @Test
+  def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit 
= {
+val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 true.toString)
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+  
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+  "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+server.remoteLogManagerOpt match {
+  case Some(_) =>
+  case None => fail("RemoteLogManager should be initialized")
+}
+
+val topicProps = new Properties()
+topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
true.toString)
+
+TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", 
servers = Seq(server), topicConfig = topicProps)
+
+server.shutdown()
+
+val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+assertThrows(classOf[ConfigException], () => 
TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps)))
+  }
+
+  @Test
+  def testClusterWideDisablementOfTieredStorageWithDisabledTieredTopic(): Unit 
= {
+val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 true.toString)
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+  
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+  "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+var server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+server.remoteLogManagerOpt match {
+  case Some(_) =>
+  case None => fail("RemoteLogManager should be initialized")
+}
+
+val topicProps = new Properties()
+topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
false.toString)
+
+TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", 
servers = Seq(server), topicConfig = topicProps)
+
+server.shutdown()
+
+val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+server = TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps))
+
+server.shutdown()
+  }
+
+  @Test
+  def 
testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringEnabled(): Unit 
= {
+val serverProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
+val server = TestUtils.createServer(KafkaConfig.fromProps(serverProps))
+
+val topicProps = new Properties()
+topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
true.toString)
+
+TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", 
servers = Seq(server), topicConfig = topicProps)
+
+server.shutdown()

Review Comment:
   this should be done in finally again so that if an exception is thrown, we 
are correctly cleaning up resources used by test i.e. closing the server.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-15 Thread via GitHub


divijvaidya commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1294983667


##
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala:
##
@@ -154,6 +155,96 @@ class KafkaServerTest extends QuorumTestHarness {
 server.shutdown()
   }
 
+  @Test
+  def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit 
= {
+val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head

Review Comment:
   please add tests for kraft as well and for each case when broker is a 
controller, when broker is a KraftServer and when broker is a ZkServer. Asking 
because some of the initialization code path varies amongst them and we can't 
be sure that the config validation is always called correctly in the future as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-15 Thread via GitHub


divijvaidya commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1294994048


##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
 topicConfig.asScala.forKeyValue { (key, value) =>
   if (!configNamesToExclude.contains(key)) props.put(key, value)
 }
+
+if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   With this change we are splitting the logic of system-wide and 
topic-specific config at two places. One in LogConfig and this one in 
ConfigHandler. Could we choose to stick to one of them? I don't have a strong 
opinion on which one as long as it's at one consolidated place. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14218: KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate

2023-08-15 Thread via GitHub


philipnee commented on code in PR #14218:
URL: https://github.com/apache/kafka/pull/14218#discussion_r1294990279


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -134,44 +132,20 @@ public static FetchMetricsManager 
createFetchMetricsManager(Metrics metrics) {
 }
 
 public static  FetchConfig createFetchConfig(ConsumerConfig 
config,
- Deserializer 
keyDeserializer,
- Deserializer 
valueDeserializer) {
-IsolationLevel isolationLevel = createIsolationLevel(config);
-return new FetchConfig<>(config, keyDeserializer, valueDeserializer, 
isolationLevel);
+ Deserializers deserializers) {
+IsolationLevel isolationLevel = getConfiguredIsolationLevel(config);
+return new FetchConfig<>(config, deserializers, isolationLevel);
 }
 
-@SuppressWarnings("unchecked")
-public static  List> 
createConsumerInterceptors(ConsumerConfig config) {
-return ClientUtils.createConfiguredInterceptors(config,
-ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
-ConsumerInterceptor.class);
+public static FetchConfig createFetchConfig(ConsumerConfig 
config) {
+Deserializers deserializers = new Deserializers<>(new 
StringDeserializer(), new StringDeserializer());

Review Comment:
   is this right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-15 Thread via GitHub


divijvaidya commented on PR #14176:
URL: https://github.com/apache/kafka/pull/14176#issuecomment-1679415679

   Unrelated test failures:
   ```
   [Build / JDK 17 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_17_and_Scala_2_13___testThreadPoolResize__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_17_and_Scala_2_13___testBalancePartitionLeaders__/)
   [Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/kafka.api/TransactionsTest/Build___JDK_11_and_Scala_2_13___testBumpTransactionalEpoch_String__quorum_kraft/)
   [Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_11_and_Scala_2_13___testThreadPoolResize__/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
   [Build / JDK 20 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_20_and_Scala_2_13___testThreadPoolResize__/)
   [Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14176/15/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_20_and_Scala_2_13___testBalancePartitionLeaders__/)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-15 Thread via GitHub


divijvaidya merged PR #14176:
URL: https://github.com/apache/kafka/pull/14176


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14212: MINOR: Add test case for follower fetch

2023-08-15 Thread via GitHub


divijvaidya commented on PR #14212:
URL: https://github.com/apache/kafka/pull/14212#issuecomment-1679398284

   Thank you for adding this.
   
   Please add what you wrote in the description as a comment in the test so 
that reader and quickly understand what the test is supposed to check.
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14212/1/testReport/kafka.server/ReplicaFetcherTierStateMachineTest/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14212: MINOR: Add test case for follower fetch

2023-08-15 Thread via GitHub


divijvaidya commented on code in PR #14212:
URL: https://github.com/apache/kafka/pull/14212#discussion_r1294964226


##
core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala:
##
@@ -85,6 +85,64 @@ class ReplicaFetcherTierStateMachineTest {
 assertEquals(9L, replicaState.logEndOffset)
   }
 
+  @Test
+  def testFollowerFetchMovedToAndDeletedFromTieredStore(): Unit = {
+val partition = new TopicPartition("topic", 0)
+
+val replicaLog = Seq(
+  mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+  mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+  mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+val replicaState = PartitionState(replicaLog, leaderEpoch = 7, 
highWatermark = 0L, rlmEnabled = true)
+
+val mockLeaderEndpoint = new MockLeaderEndPoint
+val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine)
+
+fetcher.setReplicaState(partition, replicaState)
+fetcher.addPartitions(Map(partition -> 
initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 7)))
+
+val leaderLog = Seq(
+  mkBatch(baseOffset = 7, leaderEpoch = 7, new SimpleRecord("h".getBytes)),
+  mkBatch(baseOffset = 8, leaderEpoch = 7, new SimpleRecord("i".getBytes)),
+  mkBatch(baseOffset = 9, leaderEpoch = 7, new SimpleRecord("j".getBytes)),
+  mkBatch(baseOffset = 10, leaderEpoch = 7, new 
SimpleRecord("k".getBytes)))
+
+val leaderState = PartitionState(leaderLog, leaderEpoch = 7, highWatermark 
= 10L, rlmEnabled = true)
+// Overriding the log start offset to 5 for mocking the scenario of 
segments 5-6 moved to remote store and
+// segments 0-4 expired.
+leaderState.logStartOffset = 5
+fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+assertEquals(3L, replicaState.logEndOffset)
+val expectedState = if (truncateOnFetch) Option(Fetching) else 
Option(Truncating)
+assertEquals(expectedState, fetcher.fetchState(partition).map(_.state))
+
+fetcher.doWork()
+// Verify that the out of range error is triggered and the fetch offset is 
reset to the global log start offset.
+assertEquals(0L, replicaState.logStartOffset)

Review Comment:
   Isn't this incorrect? logStartOffset should be 5 here after the first fetch 
call to the leader. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm opened a new pull request, #14218: KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate

2023-08-15 Thread via GitHub


lianetm opened a new pull request, #14218:
URL: https://github.com/apache/kafka/pull/14218

   This is a continuation of the previous 
[PR#13990](https://github.com/apache/kafka/pull/13990)
   
   This PR main refactoring relates to :
   - serializers/deserializers used in clients - unified in a Deserializers 
class
   - logic for configuring ClusterResourceListeners moved to ClientUtils
   - misc refactoring of the new async consumer in preparation for upcoming 
Request Managers
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov opened a new pull request, #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java

2023-08-15 Thread via GitHub


nizhikov opened a new pull request, #14217:
URL: https://github.com/apache/kafka/pull/14217

   This PR is part of #13247 
   It includes `ReassignPartitionsCommandArgsTest` rewritten in java.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 Move ReassignPartitionsCommand to java

2023-08-15 Thread via GitHub


nizhikov commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1679370921

   @mimaison I create #14217 which include `ReassignPartitionsCommandArgsTest` 
rewritten in java. It independent from the big PR and can be reviewed 
separately. Can you, please, take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1294925543


##
metadata/src/main/resources/common/metadata/DelegationTokenRecord.json:
##
@@ -22,8 +22,10 @@
   "fields": [
 { "name": "Owner", "type": "string", "versions": "0+",
   "about": "The delegation token owner." },
+{ "name": "Requester", "type": "string", "versions": "0+",
+  "about": "The delegation token requester." },

Review Comment:
   Understood. This is also a case where the original creator of the fields 
missed this field from TokenInformation which is what ZK uses today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1294918864


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -267,6 +270,10 @@ public boolean isLeaderEpochBumpRequiredOnIsrShrink() {
 return !this.isAtLeast(IBP_3_6_IV0);
 }
 
+public boolean isDelegationTokenSupported() {
+return this.isAtLeast(IBP_3_6_IV1);
+}

Review Comment:
   Done. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-08-15 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14133:
---
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 
 # AbstractStreamTest
 # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}KTableImplTest{color} (owner: Christo)
 # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo)
 # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo)
 # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo)
 # RecordCollectorTest (owner: Christo)
 # StateRestoreCallbackAdapterTest (owner: Christo)
 # StoreToProcessorContextAdapterTest (owner: Christo)
 # StreamsProducerTest (owner: Nelson)
 

[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-08-15 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14133:
---
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 
 # AbstractStreamTest
 # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}KTableImplTest{color} (owner: Christo)
 # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo)
 # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo)
 # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo)
 # RecordCollectorTest (owner: Christo)
 # StateRestoreCallbackAdapterTest (owner: Christo)
 # StoreToProcessorContextAdapterTest (owner: Christo)
 # StreamsProducerTest (owner: Nelson)
 

[jira] [Updated] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets

2023-08-15 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-15344:
--
Description: 
We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
network partition and streams task rebalance and subsequently reset its offsets 
to the beginning.

Inspecting the logs, we saw multiple consumer log messages like: 

{code:java}
Setting offset for partition tp to the committed offset 
FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
{code}

Inspecting the streams code, it looks like kafka streams calls `commitSync` 
passing through an explicit OffsetAndMetadata object but does not populate the 
offset leader epoch.

The offset leader epoch is required in the offset commit to ensure that all 
consumers in the consumer group have coherent metadata before fetching. 
Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
leader epoch with respect to the committed offset and get an offset out of 
range error from a zombie partition leader.

An example of where this can cause issues:
1. We have a consumer group with consumer 1 and consumer 2. Partition P is 
assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has 
stale metadata for P.
2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 
50 without an epoch.
3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 
2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will 
now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due 
to a network partition, the zombie leader may accept consumer 2's fetch leader 
epoch and return an OFFSET_OUT_OF_RANGE to consumer 2.

If in step 1, consumer 1 committed the leader epoch for the message, then when 
consumer 2 receives assignment P it would force a metadata refresh to discover 
a sufficiently new leader epoch for the committed offset.

The low-hanging fruit fix would be to have streams pass in the message epoch 
for each commit. Another fix discussed with [~hachikuji] is to have the 
consumer cache leader epoch ranges, similar to how the broker maintains a 
leader epoch cache.


  was:
We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
network partition and streams task rebalance and subsequently reset its offsets 
to the beginning.

Inspecting the logs, we saw multiple consumer log messages like: 

{code:java}
Setting offset for partition tp to the committed offset 
FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
{code}

Inspecting the streams code, it looks like kafka streams calls `commitSync` 
passing through an explicit OffsetAndMetadata object but does not populate the 
offset leader epoch.

The offset leader epoch is required in the offset commit to ensure that all 
consumers in the consumer group have coherent metadata before fetching. 
Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
leader epoch with respect to the committed offset and get an offset out of 
range error from a zombie partition leader.

The low-hanging fruit fix would be to have streams pass in the message epoch 
for each commit. Another fix discussed with [~hachikuji] is to have the 
consumer cache leader epoch ranges, similar to how the broker maintains a 
leader epoch cache.



> Kafka Streams should include the message leader epoch when committing offsets
> -
>
> Key: KAFKA-15344
> URL: https://issues.apache.org/jira/browse/KAFKA-15344
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: David Mao
>Priority: Major
>
> We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
> network partition and streams task rebalance and subsequently reset its 
> offsets to the beginning.
> Inspecting the logs, we saw multiple consumer log messages like: 
> {code:java}
> Setting offset for partition tp to the committed offset 
> FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
> {code}
> Inspecting the streams code, it looks like kafka streams calls `commitSync` 
> passing through an explicit OffsetAndMetadata object but does not populate 
> the offset leader epoch.
> The offset leader epoch is required in the offset commit to ensure that all 
> consumers in the consumer group have coherent metadata before fetching. 
> Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
> leader epoch with respect to the committed offset and get an offset out of 
> range error from a zombie partition leader.
> An example of where this can cause issues:
> 1. We have a consumer group with consumer 1 and consumer 2. Partition P is 
> assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has 

[GitHub] [kafka] AndrewJSchofield commented on pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-15 Thread via GitHub


AndrewJSchofield commented on PR #14111:
URL: https://github.com/apache/kafka/pull/14111#issuecomment-1679338770

   @junrao I don't know what's causing so many test failures. 78 of them were 
due to "unexpected threads" in the broker. I'll take another look when there's 
another build. I expect they're transient, but we'll see.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-15 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1294877936


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -267,11 +307,14 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
 
 this.needPartialUpdate = requestVersion < this.requestVersion;
 this.lastRefreshMs = nowMs;
+this.attempts = 0;
 this.updateVersion += 1;
 if (!isPartialUpdate) {
 this.needFullUpdate = false;
 this.lastSuccessfulRefreshMs = nowMs;
 }
+this.backoffOnEquivalentResponses = true;
+this.equivalentResponseCount++;

Review Comment:
   1. equivalentResponseCount is only reset when fresher metadata response is 
received. Suppose that we only have periodic metadata refresh for sometime and 
the metadata doesn't change. This will cause equivalentResponseCount to keep 
going up. When a metadata refresh is requested, this will cause the metadata 
refresh to backoff exponentially unexpectedly. 
   2. This is a bit unintuitive since we haven't checked whether the response 
is equivalent or not. Could we add a comment that this will be reset later if 
the metadata response causes the metadata to change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-15 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1294883198


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -140,17 +172,34 @@ public long metadataExpireMs() {
 }
 
 /**
- * Request an update of the current cluster metadata info, return the 
current updateVersion before the update
+ * Request an update of the current cluster metadata info, permitting 
backoff based on the number of
+ * equivalent metadata responses, which indicates that responses did not 
make progress and may be stale.
+ * @param permitBackoffOnEquivalentResponses Whether to permit backoff 
when consecutive responses are equivalent.
+ *   This should be set to 
true in situations where the update is
+ *   being requested to retry an 
operation, such as when the leader has
+ *   changed. It should be set to 
false in situations where new
+ *   metadata is being requested, 
such as adding a topic to a subscription.
+ *   In situations where it's not 
clear, it's best to use false.
+ * @return The current updateVersion before the update
  */
-public synchronized int requestUpdate() {
+public synchronized int requestUpdate(final boolean 
permitBackoffOnEquivalentResponses) {
 this.needFullUpdate = true;
+if (!permitBackoffOnEquivalentResponses) {

Review Comment:
   Since we only take action when permitBackoffOnEquivalentResponses is false, 
would it be more intuitive to pass in the negation of that as sth like 
resetEquivalentResponseCount?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-15 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1294873505


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -114,18 +125,39 @@ public synchronized Cluster fetch() {
 
 /**
  * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+ * There are two calculations for backing off based on how many attempts 
to retrieve metadata have been made
+ * since the last successful response, and how many equivalent metadata 
responses have been received.
+ * The second of these allows backing off when there are errors to do with 
stale metadata, even though the
+ * metadata responses are clean.
+ * 
+ * This can be used to check whether it's worth requesting an update in 
the knowledge that it will
+ * not be delayed if this method returns 0.
  *
  * @param nowMs current time in ms
  * @return remaining time in ms till the cluster info can be updated again
  */
 public synchronized long timeToAllowUpdate(long nowMs) {
-return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+// Calculate the backoff for attempts which acts when metadata 
responses fail
+long backoffForAttempts = Math.max(this.lastRefreshMs +
+this.refreshBackoff.backoff(this.attempts > 0 ? this.attempts 
- 1 : 0) - nowMs, 0);
+
+// Periodic updates based on expiration are not backed off based on 
equivalent responses
+long backoffForEquivalentResponseCount;
+if (Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - 
nowMs, 0) == 0) {

Review Comment:
   Instead of calculating the periodic refresh time, it's probably simpler to 
just check `updateRequested()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-08-15 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14133:
---
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 
 # AbstractStreamTest
 # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}KTableImplTest{color} (owner: Christo)
 # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo)
 # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo)
 # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo)
 # RecordCollectorTest (owner: Christo)
 # StateRestoreCallbackAdapterTest (owner: Christo)
 # StoreToProcessorContextAdapterTest (owner: Christo)
 # StreamsProducerTest (owner: Nelson)
 

[jira] [Updated] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets

2023-08-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15344:

Component/s: streams

> Kafka Streams should include the message leader epoch when committing offsets
> -
>
> Key: KAFKA-15344
> URL: https://issues.apache.org/jira/browse/KAFKA-15344
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: David Mao
>Priority: Major
>
> We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
> network partition and streams task rebalance and subsequently reset its 
> offsets to the beginning.
> Inspecting the logs, we saw multiple consumer log messages like: 
> {code:java}
> Setting offset for partition tp to the committed offset 
> FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
> {code}
> Inspecting the streams code, it looks like kafka streams calls `commitSync` 
> passing through an explicit OffsetAndMetadata object but does not populate 
> the offset leader epoch.
> The offset leader epoch is required in the offset commit to ensure that all 
> consumers in the consumer group have coherent metadata before fetching. 
> Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
> leader epoch with respect to the committed offset and get an offset out of 
> range error from a zombie partition leader.
> The low-hanging fruit fix would be to have streams pass in the message epoch 
> for each commit. Another fix discussed with [~hachikuji] is to have the 
> consumer cache leader epoch ranges, similar to how the broker maintains a 
> leader epoch cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-15 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1294874991


##
core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala:
##
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.InvalidConfigurationException
+import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.{Collections, Properties}
+import scala.collection.Seq
+import scala.concurrent.ExecutionException
+import scala.util.Random
+
+@Tag("integration")
+class RemoteTopicCrudTest extends IntegrationTestHarness {
+
+  val numPartitions = 2
+  val numReplicationFactor = 2
+  var testTopicName: String = _
+  var sysRemoteStorageEnabled = true
+
+  override protected def brokerCount: Int = 2
+
+  override protected def modifyConfigs(props: Seq[Properties]): Unit = {
+props.foreach(p => p.putAll(overrideProps()))
+  }
+
+  override protected def kraftControllerConfigs(): Seq[Properties] = {
+Seq(overrideProps())
+  }
+
+  @BeforeEach
+  override def setUp(info: TestInfo): Unit = {
+if 
(info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) {
+  sysRemoteStorageEnabled = false
+}
+super.setUp(info)
+testTopicName = 
s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = {
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
+topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+verifyRemoteLogTopicConfigs(topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = {
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
+topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+verifyRemoteLogTopicConfigs(topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): 
Unit = {
+// inherited local retention ms is 1000
+val topicConfig = new Properties()
+topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001")
+TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+verifyRemoteLogTopicConfigs(topicConfig)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): 
Unit = {
+

[jira] [Updated] (KAFKA-15257) Support interactive queries (IQv2) with versioned state store

2023-08-15 Thread Alieh Saeedi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alieh Saeedi updated KAFKA-15257:
-
Description: 
Query types to consider include:
 * single-key latest-value lookup
 * single-key lookup with timestamp bound
 * single-key query with timestamp range
 * single-key all versions query

 * key-range latest-value query
 * key-range query with timestamp bound
 * key-range query with timestamp range
 * key-range all versions query

 * all-keys latest-value query
 * all-keys all versions (i.e., entire store) query

 

 

 

 

  was:
KIP-960: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores]
 

Query types to consider include:
 * single-key latest-value lookup
 * single-key lookup with timestamp bound
 * single-key query with timestamp range
 * single-key all versions query

 * key-range latest-value query
 * key-range query with timestamp bound
 * key-range query with timestamp range
 * key-range all versions query

 * all-keys latest-value query
 * all-keys all versions (i.e., entire store) query

 

 

 

 


> Support interactive queries (IQv2) with versioned state store
> -
>
> Key: KAFKA-15257
> URL: https://issues.apache.org/jira/browse/KAFKA-15257
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>  Labels: kip
>
> Query types to consider include:
>  * single-key latest-value lookup
>  * single-key lookup with timestamp bound
>  * single-key query with timestamp range
>  * single-key all versions query
>  * key-range latest-value query
>  * key-range query with timestamp bound
>  * key-range query with timestamp range
>  * key-range all versions query
>  * all-keys latest-value query
>  * all-keys all versions (i.e., entire store) query
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores

2023-08-15 Thread Alieh Saeedi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alieh Saeedi reassigned KAFKA-15347:


Assignee: Alieh Saeedi

> Single-Key_multi-timestamp IQs with versioned state stores
> --
>
> Key: KAFKA-15347
> URL: https://issues.apache.org/jira/browse/KAFKA-15347
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> [KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just four query types:
> *Key Queries with multiple timestamps:*
>  # single-key query with upper bound timestamp
>  # single-key query with lower bound timestamp
>  # single-key query with timestamp range
>  # single-key all versions query



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15348) Range IQs with versioned state stores

2023-08-15 Thread Alieh Saeedi (Jira)
Alieh Saeedi created KAFKA-15348:


 Summary: Range IQs with versioned state stores
 Key: KAFKA-15348
 URL: https://issues.apache.org/jira/browse/KAFKA-15348
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alieh Saeedi


[KIP-969|https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores]
This ticket covers all types of range queries:

*Range Queries*
 # key-range latest-value query
 # key-range with lower bound latest-value query
 # key-range with upper bound latest-value query
 # all-keys (no bound) latest-value query
 # key-range query with timestamp (upper) bound
 # key-range with lower bound with timestamp (upper) bound 
 # key-range with upper bound with timestamp (upper) bound
 # all-keys (no bound) with timestamp (upper) bound
 # key-range query with timestamp range
 # key-range query with lower bound with timestamp range
 # key-range query with upper bound with timestamp range
 # all-keys (no bound) with timestamp range
 # key-range query all-versions
 # key-range query with lower bound all-versions
 # key-range query with upper bond all-versions
 # all-keys query (no bound) all-versions (entire store)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15348) Range IQs with versioned state stores

2023-08-15 Thread Alieh Saeedi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alieh Saeedi reassigned KAFKA-15348:


Assignee: Alieh Saeedi

> Range IQs with versioned state stores
> -
>
> Key: KAFKA-15348
> URL: https://issues.apache.org/jira/browse/KAFKA-15348
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> [KIP-969|https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers all types of range queries:
> *Range Queries*
>  # key-range latest-value query
>  # key-range with lower bound latest-value query
>  # key-range with upper bound latest-value query
>  # all-keys (no bound) latest-value query
>  # key-range query with timestamp (upper) bound
>  # key-range with lower bound with timestamp (upper) bound 
>  # key-range with upper bound with timestamp (upper) bound
>  # all-keys (no bound) with timestamp (upper) bound
>  # key-range query with timestamp range
>  # key-range query with lower bound with timestamp range
>  # key-range query with upper bound with timestamp range
>  # all-keys (no bound) with timestamp range
>  # key-range query all-versions
>  # key-range query with lower bound all-versions
>  # key-range query with upper bond all-versions
>  # all-keys query (no bound) all-versions (entire store)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores

2023-08-15 Thread Alieh Saeedi (Jira)
Alieh Saeedi created KAFKA-15347:


 Summary: Single-Key_multi-timestamp IQs with versioned state stores
 Key: KAFKA-15347
 URL: https://issues.apache.org/jira/browse/KAFKA-15347
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alieh Saeedi


[KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
This ticket covers just four query types:

*Key Queries with multiple timestamps:*
 # single-key query with upper bound timestamp
 # single-key query with lower bound timestamp
 # single-key query with timestamp range
 # single-key all versions query



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15346) Single-Key_single-timestamp IQs with versioned state stores

2023-08-15 Thread Alieh Saeedi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alieh Saeedi updated KAFKA-15346:
-
Summary: Single-Key_single-timestamp IQs with versioned state stores  (was: 
Single-Key_single-multi-timestamp IQs with versioned state stores)

> Single-Key_single-timestamp IQs with versioned state stores
> ---
>
> Key: KAFKA-15346
> URL: https://issues.apache.org/jira/browse/KAFKA-15346
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just two query types:
> *Key Queries with single timestamp:*
>  # single-key latest-value lookup
>  # single-key lookup with timestamp (upper) bound



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15346) Single-Key_single-multi-timestamp IQs with versioned state stores

2023-08-15 Thread Alieh Saeedi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alieh Saeedi updated KAFKA-15346:
-
Description: 
[KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
This ticket covers just two query types:

*Key Queries with single timestamp:*
 # single-key latest-value lookup
 # single-key lookup with timestamp (upper) bound

  was:
[KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]]
 
This ticket covers just two query types:

*Key Queries with single timestamp:*
 # single-key latest-value lookup
 # single-key lookup with timestamp (upper) bound


> Single-Key_single-multi-timestamp IQs with versioned state stores
> -
>
> Key: KAFKA-15346
> URL: https://issues.apache.org/jira/browse/KAFKA-15346
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just two query types:
> *Key Queries with single timestamp:*
>  # single-key latest-value lookup
>  # single-key lookup with timestamp (upper) bound



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >