[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-19 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1172072549


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+savePartitionAndGenerationState();

Review Comment:
   I'm not sure what's the better way to invoke `onPartitionsLost` w/o 
resetting the generation. The alternative is to use a flag, but that's rather 
unclean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #13618: MINOR: Fixing typos in the ConsumerCoordinators

2023-04-19 Thread via GitHub


philipnee opened a new pull request, #13618:
URL: https://github.com/apache/kafka/pull/13618

   Not very much really, just fixing some typos.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-04-19 Thread via GitHub


jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1172068360


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -565,6 +565,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 return
   }
+  try {
+ProduceRequest.validateProducerIds(request.header.apiVersion, 
produceRequest.data)
+  } catch {
+case e: InvalidRecordException =>
+  requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.INVALID_RECORD.exception)

Review Comment:
   yes good point
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming merged pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-19 Thread via GitHub


dengziming merged PR #13432:
URL: https://github.com/apache/kafka/pull/13432


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi opened a new pull request, #13617: MINOR:code optimization in QuorumController

2023-04-19 Thread via GitHub


hudeqi opened a new pull request, #13617:
URL: https://github.com/apache/kafka/pull/13617

   1. add hint in switch item "BROKER_LOGGER" in 
ConfigResourceExistenceChecker, otherwise, it will be classified as default 
break and deleted directly. I don’t know if adding hint is better than deleting 
directly.
   2. The ”generateSnapshotScheduled“ member variable in QuorumController is 
given a default value but has never been used. Is it better to change it to a 
final type and give a default value during initialization?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-19 Thread via GitHub


dengziming commented on PR #13432:
URL: https://github.com/apache/kafka/pull/13432#issuecomment-1515639065

   Notable changes better than old logic:
   1. Retry lookup stage on receiving `NOT_LEADER_OR_FOLLOWER` and 
`LEADER_NOT_AVAILABLE`, whereas in the past we failed the partition directly 
without retry.
   2. Removing class field `supportsMaxTimestamp` and calculating it on the fly 
to avoid the mutable state, this won't change any behavior of  the client.
   3. Retry fulfillment stage on `RetriableException`, whereas in the past we 
just retry fulfillment stage on `InvalidMetadataException`, this means we will 
retry on `TimeoutException` and other `RetriableException`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14922) kafka-streams-application-reset deletes topics not belonging to specified application-id

2023-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14922:

Component/s: streams
 tools

> kafka-streams-application-reset deletes topics not belonging to specified 
> application-id
> 
>
> Key: KAFKA-14922
> URL: https://issues.apache.org/jira/browse/KAFKA-14922
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 3.4.0
>Reporter: Jørgen
>Priority: Major
>
> Slack-thread: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]
> When running the command _kafka-streams-application-reset --bootstrap-servers 
> $BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo 
> is deleted. This happens even if there's no application-id named foo.
> Example:
> {code:java}
> Application IDs:
> foo-v1
> foo-v2
> Internal topics:
> foo-v1-repartition-topic-repartition
> foo-v2-repartition-topic-repartition 
> Application reset:
> kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP 
> --application-id foo
> > No input or intermediate topics specified. Skipping seek.
> Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
> foo-v1-repartition-topic-repartition]
> Done.{code}
> Expected behaviour is that the command fails as there are no application-id's 
> with the name foo instead of deleting all foo* topics. 
> This is critical on typos or if application-ids starts with the same name as 
> others (for example if we had foo-v21 and wanted to reset foo-v2)
> The bug should be located here: 
> [https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]
> Should check that the topics matches the application-id exactly instead of 
> checking that it starts with the application-id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14922) kafka-streams-application-reset deletes topics not belonging to specified application-id

2023-04-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714354#comment-17714354
 ] 

Matthias J. Sax commented on KAFKA-14922:
-

Thanks for creating this ticket. It's a know issue but it's unclear how it 
could be fixed.

The problem is, that topic name have the patter 
`--` – I am not sure how we could look for 
an _exact_ match (we don't know the full topic name)? If there is a way, please 
let us know. But I think we need to close this as "won't fix" unfortunately. 

> kafka-streams-application-reset deletes topics not belonging to specified 
> application-id
> 
>
> Key: KAFKA-14922
> URL: https://issues.apache.org/jira/browse/KAFKA-14922
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: Jørgen
>Priority: Major
>
> Slack-thread: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]
> When running the command _kafka-streams-application-reset --bootstrap-servers 
> $BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo 
> is deleted. This happens even if there's no application-id named foo.
> Example:
> {code:java}
> Application IDs:
> foo-v1
> foo-v2
> Internal topics:
> foo-v1-repartition-topic-repartition
> foo-v2-repartition-topic-repartition 
> Application reset:
> kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP 
> --application-id foo
> > No input or intermediate topics specified. Skipping seek.
> Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
> foo-v1-repartition-topic-repartition]
> Done.{code}
> Expected behaviour is that the command fails as there are no application-id's 
> with the name foo instead of deleting all foo* topics. 
> This is critical on typos or if application-ids starts with the same name as 
> others (for example if we had foo-v21 and wanted to reset foo-v2)
> The bug should be located here: 
> [https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]
> Should check that the topics matches the application-id exactly instead of 
> checking that it starts with the application-id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ykubota commented on a diff in pull request #13532: KAFKA-14887: No shutdown for ZK session expiration in feature processing

2023-04-19 Thread via GitHub


ykubota commented on code in PR #13532:
URL: https://github.com/apache/kafka/pull/13532#discussion_r1171979375


##
core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala:
##
@@ -160,10 +160,12 @@ class FinalizedFeatureChangeListener(private val 
finalizedFeatureCache: ZkMetada
   // safe to ignore the exception if the thread is being shutdown. We 
raise the exception
   // here again, because, it is ignored by ShutdownableThread if it is 
shutting down.
   throw ie
-case e: Exception => {
-  error("Failed to process feature ZK node change event. The broker 
will eventually exit.", e)
+case cacheUpdateException: FeatureCacheUpdateException =>

Review Comment:
   I see. I totally agree that Zookeeper session expiry should be addressed 
fast, so I will create a follow-on PR if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 opened a new pull request, #13616: KAFKA-14514: Add Optimized Uniform Assignor (KIP-848)

2023-04-19 Thread via GitHub


rreddy-22 opened a new pull request, #13616:
URL: https://github.com/apache/kafka/pull/13616

   Part of KIP-848
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol
   
   As a part of the new protocol, there will be assignors on the server side as 
well to take some load off the client. The group coordinator either directly 
computes the new target assignment for the group based on its default 
server-side assignor or requests a new assignment from one of the members in 
the group.
   
   There will be two types of assignors :- Range, Uniform. This PR is for the 
Server side range assignor.
   This PR is for the  Optimized Uniform Assignor which has two parts to it:
   1) When all the subscriptions are equal we do an optimized assignment 
   2) When the subscriptions are different we do a general assignment
   
   **The optimized uniform assignor has the liberty to treat each partition 
with equal weight since any member can get any partition from the subscribed 
topics. This makes it easier to distribute partitions and calculate quotas as a 
whole.** 
   
   The tests aim to cover all the possible cases when a re-assignment can be 
triggered.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-04-19 Thread via GitHub


artemlivshits commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1171959120


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -565,6 +565,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 return
   }
+  try {
+ProduceRequest.validateProducerIds(request.header.apiVersion, 
produceRequest.data)
+  } catch {
+case e: InvalidRecordException =>
+  requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.INVALID_RECORD.exception)

Review Comment:
   Should we use `return` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


mjsax commented on PR #13609:
URL: https://github.com/apache/kafka/pull/13609#issuecomment-1515524362

   Merged to `trunk` and cherry-picked to `3.5` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1171950976


##
metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java:
##
@@ -48,7 +48,7 @@ public class SnapshotGeneratorTest {
 static class MockEmitter implements SnapshotGenerator.Emitter {
 private final CountDownLatch latch = new CountDownLatch(1);
 private final List images = new 
CopyOnWriteArrayList<>();
-private RuntimeException problem = null;
+private volatile RuntimeException problem = null;

Review Comment:
   not another bug with this test! :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1171950562


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -869,19 +906,30 @@ public CompletableFuture 
acceptBatch(List recordBatch)
 return future;
 }
 ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch));
+new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
-public OffsetAndEpoch completeMigration() {
-// TODO write migration record, use KIP-868 transaction
-return highestMigrationRecordOffset;
+public CompletableFuture completeMigration() {
+log.info("Completing ZK Migration");
+// TODO use KIP-868 transaction
+ControllerWriteEvent event = new 
ControllerWriteEvent<>("Complete ZK Migration",
+new MigrationWriteOperation(
+Collections.singletonList(
+new ApiMessageAndVersion(
+new 
ZkMigrationStateRecord().setZkMigrationState(ZkMigrationState.MIGRATION.value()),
+ZkMigrationStateRecord.LOWEST_SUPPORTED_VERSION)
+)),
+EnumSet.of(RUNS_IN_PREMIGRATION));
+queue.append(event);
+return event.future.thenApply(__ -> highestMigrationRecordOffset);
 }
 
 @Override
 public void abortMigration() {
+log.error("Aborting ZK Migration");

Review Comment:
   Fair enough. I think we should probably call the faultHandler, at least. Or 
throw an unhandled exception to fail over?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13532: KAFKA-14887: No shutdown for ZK session expiration in feature processing

2023-04-19 Thread via GitHub


cmccabe merged PR #13532:
URL: https://github.com/apache/kafka/pull/13532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13532: KAFKA-14887: No shutdown for ZK session expiration in feature processing

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13532:
URL: https://github.com/apache/kafka/pull/13532#discussion_r1171947569


##
core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala:
##
@@ -160,10 +160,12 @@ class FinalizedFeatureChangeListener(private val 
finalizedFeatureCache: ZkMetada
   // safe to ignore the exception if the thread is being shutdown. We 
raise the exception
   // here again, because, it is ignored by ShutdownableThread if it is 
shutting down.
   throw ie
-case e: Exception => {
-  error("Failed to process feature ZK node change event. The broker 
will eventually exit.", e)
+case cacheUpdateException: FeatureCacheUpdateException =>

Review Comment:
   It's tough to decide what to do in the case where we hit a feature 
notification and we can't understand it. This is something we can discuss in a 
follow-on PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171947129


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -2175,7 +2182,7 @@ public void 
testClusterAuthorizationExceptionInProduceRequest() throws Exception
 sender.runOnce();
 assertFutureFailure(future, ClusterAuthorizationException.class);
 
-// cluster authorization errors are fatal, so we should continue 
seeing it on future sends
+// expecting to continue to see authorization error until user 
permission is fixed
 assertTrue(transactionManager.hasFatalError());

Review Comment:
   Ah I also missed. Thanks for correcting
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171946214


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -2175,7 +2182,7 @@ public void 
testClusterAuthorizationExceptionInProduceRequest() throws Exception
 sender.runOnce();
 assertFutureFailure(future, ClusterAuthorizationException.class);
 
-// cluster authorization errors are fatal, so we should continue 
seeing it on future sends
+// expecting to continue to see authorization error until user 
permission is fixed
 assertTrue(transactionManager.hasFatalError());

Review Comment:
   I think that's actually my mistake.  It's a produce error not 
initProducerId.  I'll correct these issues. Sorry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171945830


##
core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala:
##
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk.migration
+
+import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, 
DynamicConfig, ZkAdminManager}
+import kafka.utils.{Logging, PasswordEncoder}
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk._
+import kafka.zookeeper.{CreateRequest, SetDataRequest}
+import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.metadata.migration.{ConfigMigrationClient, 
MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, KeeperException}
+
+import java.{lang, util}
+import java.util.Properties
+import java.util.function.BiConsumer
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class ZkConfigMigrationClient(
+  zkClient: KafkaZkClient,
+  passwordEncoder: PasswordEncoder
+) extends ConfigMigrationClient with Logging {
+
+  val adminZkClient = new AdminZkClient(zkClient)
+
+  override def iterateClientQuotas(
+quotaEntityConsumer: BiConsumer[util.List[EntityData], util.Map[String, 
lang.Double]]
+  ): Unit = {
+def migrateEntityType(entityType: String): Unit = {
+  adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, 
props) =>
+val entity = List(new 
EntityData().setEntityType(entityType).setEntityName(name)).asJava
+val quotaMap = 
ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).map {
+  case (key, value) => key -> lang.Double.valueOf(value)
+}.toMap.asJava
+quotaEntityConsumer.accept(entity, quotaMap)
+  }
+}
+
+migrateEntityType(ConfigType.User)
+migrateEntityType(ConfigType.Client)
+
+adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, 
ConfigType.Client).foreach { case (name, props) =>
+  // Taken from ZkAdminManager
+  val components = name.split("/")
+  if (components.size != 3 || components(1) != "clients")
+throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+  val entity = List(
+new 
EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
+new 
EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
+  )
+  val quotaMap = props.asScala.map { case (key, value) =>
+val doubleValue = try lang.Double.valueOf(value) catch {
+  case _: NumberFormatException =>
+throw new IllegalStateException(s"Unexpected client quota 
configuration value: $key -> $value")
+}
+key -> doubleValue
+  }.asJava
+  quotaEntityConsumer.accept(entity.asJava, quotaMap)
+}
+
+migrateEntityType(ConfigType.Ip)
+  }
+
+  override def iterateBrokerConfigs(configConsumer: BiConsumer[String, 
util.Map[String, String]]): Unit = {
+val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+zkClient.getEntitiesConfigs(ConfigType.Broker, 
brokerEntities.toSet).foreach { case (broker, props) =>
+  val brokerResource = if (broker == ConfigEntityName.Default) {
+""
+  } else {
+broker
+  }
+  val decodedProps = props.asScala.map { case (key, value) =>
+if (DynamicBrokerConfig.isPasswordConfig(key))
+  key -> passwordEncoder.decode(value).value
+else
+  key -> value
+  }.toMap.asJava
+
+  configConsumer.accept(brokerResource, decodedProps)
+}
+  }
+
+  override def writeConfigs(
+configResource: ConfigResource,
+configMap: util.Map[String, String],
+state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = wrapZkException {
+val configType = configResource.`type`() match {
+  case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
+  case 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171945520


##
core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala:
##
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk.migration
+
+import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, 
DynamicConfig, ZkAdminManager}
+import kafka.utils.{Logging, PasswordEncoder}
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk._
+import kafka.zookeeper.{CreateRequest, SetDataRequest}
+import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.metadata.migration.{ConfigMigrationClient, 
MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, KeeperException}
+
+import java.{lang, util}
+import java.util.Properties
+import java.util.function.BiConsumer
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class ZkConfigMigrationClient(
+  zkClient: KafkaZkClient,
+  passwordEncoder: PasswordEncoder
+) extends ConfigMigrationClient with Logging {
+
+  val adminZkClient = new AdminZkClient(zkClient)
+
+  override def iterateClientQuotas(
+quotaEntityConsumer: BiConsumer[util.List[EntityData], util.Map[String, 
lang.Double]]
+  ): Unit = {
+def migrateEntityType(entityType: String): Unit = {
+  adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, 
props) =>
+val entity = List(new 
EntityData().setEntityType(entityType).setEntityName(name)).asJava
+val quotaMap = 
ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).map {
+  case (key, value) => key -> lang.Double.valueOf(value)
+}.toMap.asJava
+quotaEntityConsumer.accept(entity, quotaMap)
+  }
+}
+
+migrateEntityType(ConfigType.User)
+migrateEntityType(ConfigType.Client)
+
+adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, 
ConfigType.Client).foreach { case (name, props) =>
+  // Taken from ZkAdminManager
+  val components = name.split("/")
+  if (components.size != 3 || components(1) != "clients")
+throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+  val entity = List(
+new 
EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
+new 
EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
+  )
+  val quotaMap = props.asScala.map { case (key, value) =>
+val doubleValue = try lang.Double.valueOf(value) catch {
+  case _: NumberFormatException =>
+throw new IllegalStateException(s"Unexpected client quota 
configuration value: $key -> $value")
+}
+key -> doubleValue
+  }.asJava
+  quotaEntityConsumer.accept(entity.asJava, quotaMap)
+}
+
+migrateEntityType(ConfigType.Ip)
+  }
+
+  override def iterateBrokerConfigs(configConsumer: BiConsumer[String, 
util.Map[String, String]]): Unit = {
+val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+zkClient.getEntitiesConfigs(ConfigType.Broker, 
brokerEntities.toSet).foreach { case (broker, props) =>
+  val brokerResource = if (broker == ConfigEntityName.Default) {
+""

Review Comment:
   we should have a comment that the empty string means "default entity name" 
in kraft



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171945520


##
core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala:
##
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk.migration
+
+import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, 
DynamicConfig, ZkAdminManager}
+import kafka.utils.{Logging, PasswordEncoder}
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk._
+import kafka.zookeeper.{CreateRequest, SetDataRequest}
+import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.metadata.migration.{ConfigMigrationClient, 
MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, KeeperException}
+
+import java.{lang, util}
+import java.util.Properties
+import java.util.function.BiConsumer
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class ZkConfigMigrationClient(
+  zkClient: KafkaZkClient,
+  passwordEncoder: PasswordEncoder
+) extends ConfigMigrationClient with Logging {
+
+  val adminZkClient = new AdminZkClient(zkClient)
+
+  override def iterateClientQuotas(
+quotaEntityConsumer: BiConsumer[util.List[EntityData], util.Map[String, 
lang.Double]]
+  ): Unit = {
+def migrateEntityType(entityType: String): Unit = {
+  adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, 
props) =>
+val entity = List(new 
EntityData().setEntityType(entityType).setEntityName(name)).asJava
+val quotaMap = 
ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).map {
+  case (key, value) => key -> lang.Double.valueOf(value)
+}.toMap.asJava
+quotaEntityConsumer.accept(entity, quotaMap)
+  }
+}
+
+migrateEntityType(ConfigType.User)
+migrateEntityType(ConfigType.Client)
+
+adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, 
ConfigType.Client).foreach { case (name, props) =>
+  // Taken from ZkAdminManager
+  val components = name.split("/")
+  if (components.size != 3 || components(1) != "clients")
+throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+  val entity = List(
+new 
EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
+new 
EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
+  )
+  val quotaMap = props.asScala.map { case (key, value) =>
+val doubleValue = try lang.Double.valueOf(value) catch {
+  case _: NumberFormatException =>
+throw new IllegalStateException(s"Unexpected client quota 
configuration value: $key -> $value")
+}
+key -> doubleValue
+  }.asJava
+  quotaEntityConsumer.accept(entity.asJava, quotaMap)
+}
+
+migrateEntityType(ConfigType.Ip)
+  }
+
+  override def iterateBrokerConfigs(configConsumer: BiConsumer[String, 
util.Map[String, String]]): Unit = {
+val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+zkClient.getEntitiesConfigs(ConfigType.Broker, 
brokerEntities.toSet).foreach { case (broker, props) =>
+  val brokerResource = if (broker == ConfigEntityName.Default) {
+""

Review Comment:
   we should have a comment that the empty string means "default entity name" 
in kraft
   
   or possibly use a constant, but I don't know if there is one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171945097


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -174,12 +173,12 @@ private boolean areZkBrokersReadyForMigration() {
 /**
  * Apply a function which transforms our internal migration state.
  *
- * @param name  A descriptive name of the function that is being applied
- * @param stateMutator  A function which performs some migration 
operations and possibly transforms our internal state
+ * @param name A descriptive name of the function that is being 
applied
+ * @param migrationOp  A function which performs some migration operations 
and possibly transforms our internal state
  */
-private void apply(String name, Function stateMutator) {
+private void applyMigrationOperation(String name, KRaftMigrationOperation 
migrationOp) {
 ZkMigrationLeadershipState beforeState = this.migrationLeadershipState;
-ZkMigrationLeadershipState afterState = 
stateMutator.apply(beforeState);
+ZkMigrationLeadershipState afterState = migrationOp.apply(beforeState);
 log.trace("{} transitioned from {} to {}", name, beforeState, 
afterState);

Review Comment:
   I feel like if the state is meaningfully different, we'd want this to be 
higher than TRACE.
   
   Like the metadata offset change is fine to log at TRACE, but if the 
controller epoch changes or controller ID changes, wouldn't we want this to be 
INFO?
   
   I guess we could do this in a follow-on PR as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


mjsax merged PR #13609:
URL: https://github.com/apache/kafka/pull/13609


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13615: KAFKA-14834: [12/N] Minor code cleanups relating to versioned stores

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13615:
URL: https://github.com/apache/kafka/pull/13615#discussion_r1171934110


##
streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java:
##
@@ -36,6 +37,7 @@
  * @param value The value to update, it can be {@code null};
  *  if the serialized bytes are also {@code null} it is 
interpreted as deletes
  * @throws NullPointerException If {@code null} is used for key.
+ * @throws InvalidStateStoreException if the store is not initialized

Review Comment:
   Went through and added this additional javadoc line to the methods where it 
seemed to be missing (as requested in 
https://github.com/apache/kafka/pull/13188#discussion_r1097993808). Looks like 
the usage throughout the codebase is very inconsistent though:
   * `RocksDBStore` enforces it on all methods even though it wasn't documented 
in `KeyValueStore`. The in-memory implementations do not enforce it.
   * `WindowStore` has docs about this but the actual implementations do not 
enforce it.
   * `SessionStore` neither has the annotations nor implements it.
   * `VersionedKeyValueStore` has docs but the implementation was not enforcing 
it. I've reconciled this inconsistency in this PR, but the others are larger 
changes that I'd like to leave for later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171943462


##
core/src/main/scala/kafka/zk/migration/ZkAclMigrationClient.scala:
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk.migration
+
+import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, 
VersionedAcls}
+import kafka.security.authorizer.{AclAuthorizer, AclEntry}
+import kafka.utils.Logging
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.{KafkaZkClient, ResourceZNode, ZkAclStore, ZkVersion}
+import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
+import org.apache.kafka.common.acl.AccessControlEntry
+import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.metadata.migration.{AclMigrationClient, 
MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.KeeperException.Code
+
+import java.util
+import java.util.function.BiConsumer
+import scala.jdk.CollectionConverters._
+
+class ZkAclMigrationClient(
+  zkClient: KafkaZkClient
+) extends AclMigrationClient with Logging {
+
+  private def aclChangeNotificationRequest(resourcePattern: ResourcePattern): 
CreateRequest = {
+// ZK broker needs the ACL change notification znode to be updated in 
order to process the new ACLs
+val aclChange = 
ZkAclStore(resourcePattern.patternType).changeStore.createChangeNode(resourcePattern)
+CreateRequest(aclChange.path, aclChange.bytes, 
zkClient.defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
+  }
+
+  private def tryWriteAcls(
+resourcePattern: ResourcePattern,
+aclEntries: Set[AclEntry],
+create: Boolean,
+state: ZkMigrationLeadershipState
+  ): Option[ZkMigrationLeadershipState] = wrapZkException {
+val aclData = ResourceZNode.encode(aclEntries)
+
+val request = if (create) {
+  val path = ResourceZNode.path(resourcePattern)
+  CreateRequest(path, aclData, zkClient.defaultAcls(path), 
CreateMode.PERSISTENT)
+} else {
+  SetDataRequest(ResourceZNode.path(resourcePattern), aclData, 
ZkVersion.MatchAnyVersion)
+}
+
+val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
+if (responses.head.resultCode.equals(Code.NONODE)) {
+  // Need to call this method again with create=true
+  None
+} else {
+  // Write the ACL notification outside of a metadata multi-op
+  
zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern))
+  Some(state.withMigrationZkVersion(migrationZkVersion))
+}
+  }
+
+  override def writeResourceAcls(
+resourcePattern: ResourcePattern,
+aclsToWrite: util.Collection[AccessControlEntry],
+state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = {
+val acls = aclsToWrite.asScala.map(new AclEntry(_)).toSet
+tryWriteAcls(resourcePattern, acls, create = false, state) match {
+  case Some(newState) => newState
+  case None => tryWriteAcls(resourcePattern, acls, create = true, state) 
match {
+case Some(newState) => newState
+case None => throw new MigrationClientException(s"Could not write ACLs 
for resource pattern $resourcePattern")
+  }
+}
+  }
+
+  override def deleteResource(
+resourcePattern: ResourcePattern,
+state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = {
+val request = DeleteRequest(ResourceZNode.path(resourcePattern), 
ZkVersion.MatchAnyVersion)
+val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
+if (responses.head.resultCode.equals(Code.OK) || 
responses.head.resultCode.equals(Code.NONODE)) {
+  // Write the ACL notification outside of a metadata multi-op
+  
zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern))
+  state.withMigrationZkVersion(migrationZkVersion)
+} else {
+  throw new MigrationClientException(s"Could not delete ACL for resource 
pattern $resourcePattern")
+}
+  }
+
+  override def iterateAcls(
+aclConsumer: BiConsumer[ResourcePattern, 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171941841


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.image.AclsDelta;
+import org.apache.kafka.image.AclsImage;
+import org.apache.kafka.image.ClientQuotasDelta;
+import org.apache.kafka.image.ClientQuotasImage;
+import org.apache.kafka.image.ConfigurationsDelta;
+import org.apache.kafka.image.ConfigurationsImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KRaftMigrationZkWriter {
+private final MigrationClient migrationClient;
+private final BiConsumer 
operationConsumer;
+
+public KRaftMigrationZkWriter(
+MigrationClient migrationClient,
+BiConsumer  operationConsumer
+) {
+this.migrationClient = migrationClient;
+this.operationConsumer = operationConsumer;
+}
+
+public void handleSnapshot(MetadataImage image) {

Review Comment:
   We are missing handling for `clusterImage` here, right? Specifically, we 
need to write out KRaft brokers to /brokers. ZK-based brokers will continue to 
write themselves out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171941706


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.image.AclsDelta;
+import org.apache.kafka.image.AclsImage;
+import org.apache.kafka.image.ClientQuotasDelta;
+import org.apache.kafka.image.ClientQuotasImage;
+import org.apache.kafka.image.ConfigurationsDelta;
+import org.apache.kafka.image.ConfigurationsImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KRaftMigrationZkWriter {
+private final MigrationClient migrationClient;
+private final BiConsumer 
operationConsumer;
+
+public KRaftMigrationZkWriter(
+MigrationClient migrationClient,
+BiConsumer  operationConsumer
+) {
+this.migrationClient = migrationClient;
+this.operationConsumer = operationConsumer;
+}
+
+public void handleSnapshot(MetadataImage image) {
+handleTopicsSnapshot(image.topics());
+handleConfigsSnapshot(image.configs());
+handleClientQuotasSnapshot(image.clientQuotas());
+operationConsumer.accept("Setting next producer ID", migrationState ->
+
migrationClient.writeProducerId(image.producerIds().highestSeenProducerId(), 
migrationState));
+handleAclsSnapshot(image.acls());
+}
+
+public void handleDelta(MetadataImage previousImage, MetadataImage image, 
MetadataDelta delta) {

Review Comment:
   We are missing handling for `clusterDelta` here, right? Specifically, we 
need to write out KRaft brokers to /brokers. ZK-based brokers will continue to 
write themselves out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13610: MINOR: update comments for FK join processor renames

2023-04-19 Thread via GitHub


mjsax merged PR #13610:
URL: https://github.com/apache/kafka/pull/13610


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on PR #13461:
URL: https://github.com/apache/kafka/pull/13461#issuecomment-1515504502

   Thanks for this @mumrah .
   
   Can we have a ZkConfigMigrationClientTest and so on, for the other new 
migrationclient classes?
   
   Should we wrap the calls to `quotaEntityConsumer.accept` (and other 
consumers) so that if it throws an exception, we log an ERROR message to log4j 
to let us know what failed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1171939132


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java:
##
@@ -73,6 +74,7 @@  KTable build(final Map, Aggregator

[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171935467


##
metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.PartitionRegistration;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CapturingTopicMigrationClient implements TopicMigrationClient {
+public List deletedTopics = new ArrayList<>();
+public List createdTopics = new ArrayList<>();
+public LinkedHashMap> updatedTopicPartitions = new 
LinkedHashMap<>();
+
+public void reset() {
+createdTopics.clear();
+updatedTopicPartitions.clear();
+deletedTopics.clear();
+}
+
+
+@Override
+public void iterateTopics(EnumSet interests, 
TopicVisitor visitor) {
+

Review Comment:
   I think this should just be abstract if we expect people to subclass this to 
implement it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171935467


##
metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.PartitionRegistration;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CapturingTopicMigrationClient implements TopicMigrationClient {
+public List deletedTopics = new ArrayList<>();
+public List createdTopics = new ArrayList<>();
+public LinkedHashMap> updatedTopicPartitions = new 
LinkedHashMap<>();
+
+public void reset() {
+createdTopics.clear();
+updatedTopicPartitions.clear();
+deletedTopics.clear();
+}
+
+
+@Override
+public void iterateTopics(EnumSet interests, 
TopicVisitor visitor) {
+

Review Comment:
   why is this empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-04-19 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171934758


##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -16,71 +16,79 @@
  */
 package kafka.zk
 
-import kafka.api.LeaderAndIsr
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, 
VersionedAcls}
-import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, 
ZkAdminManager}
 import kafka.utils.{Logging, PasswordEncoder}
-import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient, 
ZkTopicMigrationClient}
 import kafka.zookeeper._
 import org.apache.kafka.common.acl.AccessControlEntry
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
 import org.apache.kafka.common.metadata._
-import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
-import org.apache.kafka.metadata.migration.{MigrationClient, 
MigrationClientAuthException, MigrationClientException, 
ZkMigrationLeadershipState}
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.migration.TopicMigrationClient.{TopicVisitor, 
TopicVisitorInterest}
+import org.apache.kafka.metadata.migration._
 import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
-import org.apache.zookeeper.KeeperException.{AuthFailedException, Code, 
NoAuthException, SessionClosedRequireAuthException}
-import org.apache.zookeeper.{CreateMode, KeeperException}
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{AuthFailedException, 
NoAuthException, SessionClosedRequireAuthException}
 
 import java.util
 import java.util.Properties
-import java.util.function.{BiConsumer, Consumer}
+import java.util.function.Consumer
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 object ZkMigrationClient {
+
   val MaxBatchSize = 100
-}
 
-/**
- * Migration client in KRaft controller responsible for handling communication 
to Zookeeper and
- * the ZkBrokers present in the cluster. Methods that directly use 
KafkaZkClient should use the wrapZkException
- * wrapper function in order to translate KeeperExceptions into something 
usable by the caller.
- */
-class ZkMigrationClient(
-  zkClient: KafkaZkClient,
-  zkConfigEncoder: PasswordEncoder
-) extends MigrationClient with Logging {
+  def apply(
+zkClient: KafkaZkClient,
+zkConfigEncoder: PasswordEncoder
+  ): ZkMigrationClient = {
+val topicClient = new ZkTopicMigrationClient(zkClient)
+val configClient = new ZkConfigMigrationClient(zkClient, zkConfigEncoder)
+val aclClient = new ZkAclMigrationClient(zkClient)
+new ZkMigrationClient(zkClient, topicClient, configClient, aclClient)
+  }
 
   /**
* Wrap a function such that any KeeperExceptions is captured and converted 
to a MigrationClientException.
* Any authentication related exception is converted to a 
MigrationClientAuthException which may be treated
* differently by the caller.
*/
   @throws(classOf[MigrationClientException])
-  private def wrapZkException[T](fn: => T): T = {
+  def wrapZkException[T](fn: => T): T = {
 try {
   fn
 } catch {
-  case e @ (_: MigrationClientException | _: MigrationClientAuthException) 
=> throw e
-  case e @ (_: AuthFailedException | _: NoAuthException | _: 
SessionClosedRequireAuthException) =>
+  case e@(_: MigrationClientException | _: MigrationClientAuthException) 
=> throw e

Review Comment:
   are these whitespace changes needed?



##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -16,71 +16,79 @@
  */
 package kafka.zk
 
-import kafka.api.LeaderAndIsr
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, 
VersionedAcls}
-import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, 
ZkAdminManager}
 import kafka.utils.{Logging, PasswordEncoder}
-import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient, 
ZkTopicMigrationClient}
 import kafka.zookeeper._
 import org.apache.kafka.common.acl.AccessControlEntry
 import org.apache.kafka.common.config.ConfigResource
 import 

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13615: KAFKA-14834: [12/N] Minor code cleanups relating to versioned stores

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13615:
URL: https://github.com/apache/kafka/pull/13615#discussion_r1171934110


##
streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java:
##
@@ -36,6 +37,7 @@
  * @param value The value to update, it can be {@code null};
  *  if the serialized bytes are also {@code null} it is 
interpreted as deletes
  * @throws NullPointerException If {@code null} is used for key.
+ * @throws InvalidStateStoreException if the store is not initialized

Review Comment:
   Went through and added this additional javadoc line to the methods where it 
seemed to be missing (as requested in 
https://github.com/apache/kafka/pull/13188#discussion_r1097993808). Looks like 
the usage throughout the codebase is very inconsistent though:
   * `RocksDBStore` enforces it on all methods even though it wasn't documented 
in `KeyValueStore`. The in-memory implementations do not enforce it.
   * `WindowStore` has docs about this but the actual implementations do not 
enforce it.
   * `VersionedKeyValueStore` has docs but the implementation was not enforcing 
it. I've reconciled this inconsistency in this PR, but the others are larger 
changes that I'd like to leave for later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13615: KAFKA-14834: [12/N] Minor code cleanups relating to versioned stores

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13615:
URL: https://github.com/apache/kafka/pull/13615#discussion_r1171934110


##
streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java:
##
@@ -36,6 +37,7 @@
  * @param value The value to update, it can be {@code null};
  *  if the serialized bytes are also {@code null} it is 
interpreted as deletes
  * @throws NullPointerException If {@code null} is used for key.
+ * @throws InvalidStateStoreException if the store is not initialized

Review Comment:
   Went through and added this additional javadoc line to the methods where it 
seemed to be missing. Looks like the usage throughout the codebase is very 
inconsistent though:
   * `RocksDBStore` enforces it on all methods even though it wasn't documented 
in `KeyValueStore`. The in-memory implementations do not enforce it.
   * `WindowStore` has docs about this but the actual implementations do not 
enforce it.
   * `VersionedKeyValueStore` has docs but the implementation was not enforcing 
it. I've reconciled this inconsistency in this PR, but the others are larger 
changes that I'd like to leave for later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13188: KAFKA-14491: [5/N] Basic operations for RocksDB versioned store

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13188:
URL: https://github.com/apache/kafka/pull/13188#discussion_r1171931840


##
streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * A key-value store that stores multiple record versions per key, and 
supports timestamp-based
+ * retrieval operations to return the latest record (per key) as of a 
specified timestamp.
+ * Only one record is stored per key and timestamp, i.e., a second call to
+ * {@link #put(Object, Object, long)} with the same key and timestamp will 
replace the first.
+ * 
+ * Each store instance has an associated, fixed-duration "history retention" 
which specifies
+ * how long old record versions should be kept for. In particular, a versioned 
store guarantees
+ * to return accurate results for calls to {@link #get(Object, long)} where 
the provided timestamp
+ * bound is within history retention of the current observed stream time. 
(Queries with timestamp
+ * bound older than the specified history retention are considered invalid.)
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public interface VersionedKeyValueStore extends StateStore {
+
+/**
+ * Add a new record version associated with this key.
+ *
+ * @param key   The key
+ * @param value The value, it can be {@code null};
+ *  if the serialized bytes are also {@code null} it is 
interpreted as a delete
+ * @param timestamp The timestamp for this record version
+ * @throws NullPointerException If {@code null} is used for key.
+ */
+void put(K key, V value, long timestamp);
+
+/**
+ * Delete the value associated with this key from the store, at the 
specified timestamp
+ * (if there is such a value), and return the deleted value.
+ * 
+ * This operation is semantically equivalent to {@link #get(Object, long)} 
#get(key, timestamp))}
+ * followed by {@link #put(Object, Object, long) #put(key, null, 
timestamp)}.
+ *
+ * @param key   The key
+ * @param timestamp The timestamp for this delete
+ * @return The value and timestamp of the latest record associated with 
this key
+ * as of the deletion timestamp (inclusive), or {@code null} if 
any of
+ * (1) the store contains no records for this key, (2) the latest 
record
+ * for this key as of the deletion timestamp is a tombstone, or
+ * (3) the deletion timestamp is older than this store's history 
retention
+ * (i.e., this store no longer contains data for the provided 
timestamp).
+ * @throws NullPointerException If {@code null} is used for key.
+ */
+VersionedRecord delete(K key, long timestamp);
+
+/**
+ * Get the latest (by timestamp) record associated with this key.
+ *
+ * @param key The key to fetch
+ * @return The value and timestamp of the latest record associated with 
this key, or
+ * {@code null} if either (1) the store contains no records for 
this key or (2) the
+ * latest record for this key is a tombstone.
+ * @throws NullPointerException   If null is used for key.
+ * @throws InvalidStateStoreException if the store is not initialized

Review Comment:
   Updated in https://github.com/apache/kafka/pull/13615.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13564: KAFKA-14834: [8/N] Propagate `isLatest` as part of `Change`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1171930596


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##
@@ -139,7 +139,7 @@ public void process(final Record> record) {
 oldValue = joiner.apply(record.value().oldValue, valueRight);
 }
 
-context().forward(record.withValue(new Change<>(newValue, 
oldValue)).withTimestamp(resultTimestamp));
+context().forward(record.withValue(new Change<>(newValue, 
oldValue, record.value().isLatest)).withTimestamp(resultTimestamp));

Review Comment:
   Coming back to your question on this PR, yes `isLatest` will always be true 
in this join processor, but it's not necessarily the case in the join merge 
processor. It would not necessarily be true in the case where a user 
materialized the join result as a versioned store (even though they shouldn't, 
semantically), if they encounter this edge case with nulls.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java:
##
@@ -117,13 +120,16 @@ public void init(final ProcessorContext> 
context) {
 @Override
 public void process(final Record> record) {
 if (queryableName != null) {
-store.put(record.key(), record.value().newValue, 
record.timestamp());
-tupleForwarder.maybeForward(record);
+final long putReturnCode = store.put(record.key(), 
record.value().newValue, record.timestamp());
+// if not put to store, do not forward downstream either

Review Comment:
   Accidentally responded above in 
https://github.com/apache/kafka/pull/13564#discussion_r1171930596, instead of 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13564: KAFKA-14834: [8/N] Propagate `isLatest` as part of `Change`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1171930805


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java:
##
@@ -154,27 +154,25 @@ public void process(final Record> record) {
 final KeyValue oldPair = 
record.value().oldValue == null ? null :
 mapper.apply(record.key(), record.value().oldValue);
 
+final boolean isLatest = record.value().isLatest;

Review Comment:
   Updated in https://github.com/apache/kafka/pull/13615.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java:
##
@@ -118,11 +120,14 @@ public void process(final Record> record) {
 
 if (queryableName == null) {

Review Comment:
   Updated in https://github.com/apache/kafka/pull/13615.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13564: KAFKA-14834: [8/N] Propagate `isLatest` as part of `Change`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1171929315


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##
@@ -139,7 +139,7 @@ public void process(final Record> record) {
 oldValue = joiner.apply(record.value().oldValue, valueRight);
 }
 
-context().forward(record.withValue(new Change<>(newValue, 
oldValue)).withTimestamp(resultTimestamp));
+context().forward(record.withValue(new Change<>(newValue, 
oldValue, record.value().isLatest)).withTimestamp(resultTimestamp));

Review Comment:
   Thought of an interesting edge case earlier, has to do with nulls. Doesn't 
apply to inner joins but suppose we have a left join:
   ```
   B: (k, b, ts=3)
   A: (k, a1, ts=1) --> emit join result (a1, b, ts=3)
   B: (k, null, ts=4) --> emit join result (a1, null, ts=4)
   A: (k, a2, ts=2) --> emit join result (a2, null, ts=2)
   ```
   The reason that the last join result is emitted with timestamp 2 instead of 
4 is because when the null is looked up as the latest value from the B store, 
there is no timestamp associated with it, and therefore the A record timestamp 
is used. 
   
   This example is interesting because the final join result `(a2, null)` is 
the "most recent" join result in the sense that it is the join of the latest 
record from the A side (`a2`) with the latest record from the B side (`null`), 
but it does not have the latest timestamp of the join result records (because 
the previous join result has timestamp 4). So, which join result should be 
considered the latest? It should probably be the last one, right? In that case 
it would be extra wrong to materialize the join result as a versioned store, 
since a versioned store would think that the second-to-last join result is the 
latest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13564: KAFKA-14834: [8/N] Propagate `isLatest` as part of `Change`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1171926280


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##
@@ -118,10 +120,13 @@ public void process(final Record record) {
 
 newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
 
-store.put(record.key(), newAgg, newTimestamp);
-tupleForwarder.maybeForward(
-record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
-.withTimestamp(newTimestamp));
+final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+// if not put to store, do not forward downstream either
+if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {
+tupleForwarder.maybeForward(
+record.withValue(new Change<>(newAgg, sendOldValues ? 
oldAgg : null, putReturnCode == PUT_RETURN_CODE_IS_LATEST))

Review Comment:
   Same as above -- I'll simplify this if it's correct to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13564: KAFKA-14834: [8/N] Propagate `isLatest` as part of `Change`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1171926005


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##
@@ -118,10 +120,13 @@ public void process(final Record record) {
 
 newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
 
-store.put(record.key(), newAgg, newTimestamp);
-tupleForwarder.maybeForward(
-record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
-.withTimestamp(newTimestamp));
+final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);

Review Comment:
   Yes, we do not attempt to "fix" older aggregation results. I can add a note 
in the KIP. 
   
   See my comment above about whether we can actually guarantee that result 
timestamps are nondecreasing: 
https://github.com/apache/kafka/pull/13564#discussion_r1171925337 Same question 
applies for both table aggregations and stream aggregations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13564: KAFKA-14834: [8/N] Propagate `isLatest` as part of `Change`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1171925337


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java:
##
@@ -116,10 +118,13 @@ public void process(final Record> 
record) {
 }
 
 // update the store with the new value
-store.put(record.key(), newAgg, newTimestamp);
-tupleForwarder.maybeForward(
-record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
-.withTimestamp(newTimestamp));
+final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+// if not put to store, do not forward downstream either
+if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {

Review Comment:
   Is it valid for a computed aggregation to be null? If it is, I think it's 
actually not guaranteed that aggregation result timestamps are always 
nondecreasing. If an out-of-order record arrives after the `newAgg` value is 
set to null, then the aggregation result will be re-initialized, and the 
timestamp will be the timestamp of the out-of-order record, which is earlier 
than the previous aggregation timestamp.
   
   If that scenario is possible, then it's a bit unclear which of the two 
should be considered the "latest" aggregation value, and we should chose 
whether to keep the current code or to always set `isLatest = true` depending 
on what semantics we want.
   
   If that scenario is not possible, i.e., if it is not valid for an 
aggregation value to be null, then the two are equivalent and I can update this 
code to clean it up per your suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13522:
URL: https://github.com/apache/kafka/pull/13522#discussion_r1171919730


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java:
##
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+@Category(IntegrationTest.class)
+public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends 
KTableKTableForeignKeyJoinIntegrationTest {
+
+public KTableKTableForeignKeyVersionedJoinIntegrationTest(final boolean 
leftJoin,
+  final boolean 
materialized,
+  final boolean 
leftVersioned,
+  final boolean 
rightVersioned) {
+// optimizations and rejoin are disabled for these tests, as these 
tests focus on versioning.
+// see KTableKTableForeignKeyJoinIntegrationTest for test coverage for 
optimizations and rejoin
+super(leftJoin, StreamsConfig.NO_OPTIMIZATION, materialized, false, 
leftVersioned, rightVersioned);
+}
+
+@Parameterized.Parameters(name = "leftJoin={0}, materialized={1}, 
leftVersioned={2}, rightVersioned={3}")
+public static Collection data() {
+final List booleans = Arrays.asList(true, false);
+return buildParameters(booleans, booleans, booleans, booleans);
+}
+
+@Test
+public void shouldIgnoreOutOfOrderRecordsIffVersioned() {

Review Comment:
   Ah yeah I meant for this to mean "if and only if." It does look like a typo 
at first glance haha.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13522:
URL: https://github.com/apache/kafka/pull/13522#discussion_r1171919367


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -249,62 +272,62 @@ public void doJoinFromLeftThenDeleteLeftEntity() {
 
 @Test
 public void doJoinFromRightThenDeleteRightEntity() {
-final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin);
+final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
 try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
 final TestInputTopic right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
 final TestInputTopic left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
 final TestOutputTopic outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
 final KeyValueStore store = 
driver.getKeyValueStore("store");
 
 // Pre-populate the LHS records. This test is all about what 
happens when we add/remove RHS records
-left.pipeInput("lhs1", "lhsValue1|rhs1");
-left.pipeInput("lhs2", "lhsValue2|rhs2");
-left.pipeInput("lhs3", "lhsValue3|rhs1");
+left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
+left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 1);
+left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 2);
 
 assertThat(
 outputTopic.readKeyValuesToMap(),
 is(leftJoin
-   ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
-   mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
-   mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
-   : emptyMap()
+? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+: emptyMap()

Review Comment:
   Ack, sorry about that. I tried looking to see if I have an auto-reformat 
setting on my IDE but everything looks to be disabled. I'll be on the lookout 
for this in the future; hopefully it won't be an issue again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13522:
URL: https://github.com/apache/kafka/pull/13522#discussion_r1171919015


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -140,9 +163,9 @@ public void doJoinFromLeftThenDeleteLeftEntity() {
 final KeyValueStore store = 
driver.getKeyValueStore("store");
 
 // Pre-populate the RHS records. This test is all about what 
happens when we add/remove LHS records
-right.pipeInput("rhs1", "rhsValue1");
-right.pipeInput("rhs2", "rhsValue2");
-right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK 
won't show up in any results
+right.pipeInput("rhs1", "rhsValue1", baseTimestamp);

Review Comment:
   I'll leave it as is for readability, since I believe auto-advance would be 
per-topic and the test behavior is clearer this way since it's explicit that 
records from the two input topics do not overlap/interleave in their timestamps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on pull request #13509: KAFKA-14834: [3/N] Timestamped lookups for stream-table joins

2023-04-19 Thread via GitHub


vcrfxia commented on PR #13509:
URL: https://github.com/apache/kafka/pull/13509#issuecomment-1515473590

   Closing the loop here: versioned stores are disallowed for GlobalKTables in 
https://github.com/apache/kafka/pull/13565, so we should be all set.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13615: KAFKA-14834: [12/N] Minor code cleanups relating to versioned stores

2023-04-19 Thread via GitHub


vcrfxia opened a new pull request, #13615:
URL: https://github.com/apache/kafka/pull/13615

   (This PR is stacked on https://github.com/apache/kafka/pull/13609. Only 
commits starting from `whitespace cleanup` need to be reviewed separately.)
   
   This PR contains various minor/non-functional refactors requested from 
previous PR reviews. The only change which is borderline functional is that the 
RocksDB versioned store implementation now throws `InvalidStateStoreException` 
if any of its public methods are accessed before the store is 
initialized/opened. (None of the methods would have worked before anyway, but 
now the error message is cleaner.)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-19 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1171913729


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ *  the number of subscribed members is greater than the number of 
partitions for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ *This can only be done if every member is subscribed to the same topics 
and the topics are co-partitioned.
+ *Two streams are co-partitioned if the following conditions are met:
+ *
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ *
+ *  Members should retain as much as their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given member 
subscriptions.
+ *  Generate a list of members (potentiallyUnfilledMembers) 
that have not met the minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  Add members from the potentiallyUnfilled list to the 
Unfilled list if they haven't met the total required quota i.e. 
minimum number of partitions per member + 1 (if member is designated to receive 
one of the excess partitions) 
+ *  Generate a list of unassigned partitions by calculating the difference 
between total partitions and already assigned (sticky) partitions 
+ *  Iterate through unfilled members and assign partitions from the 
unassigned partitions 
+ * 
+ * 
+ *
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
+} else {
+log.info(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private 

[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171913050


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -2189,7 +2196,7 @@ public void testCancelInFlightRequestAfterFatalError() 
throws Exception {
 prepareAndReceiveInitProducerId(producerId, Errors.NONE);
 assertTrue(transactionManager.hasProducerId());
 
-// cluster authorization is a fatal error for the producer
+// expecting authorization error on send

Review Comment:
   ditto here -- should we change this test to use a fatal error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171912717


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -2175,7 +2182,7 @@ public void 
testClusterAuthorizationExceptionInProduceRequest() throws Exception
 sender.runOnce();
 assertFutureFailure(future, ClusterAuthorizationException.class);
 
-// cluster authorization errors are fatal, so we should continue 
seeing it on future sends
+// expecting to continue to see authorization error until user 
permission is fixed
 assertTrue(transactionManager.hasFatalError());

Review Comment:
   Should this still be true? Do we remove it from fatal error state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171904037


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-fatalError(error.exception());
+log.info("Abortable authorization error: {}.  Transition the 
producer state to {}", error.message(), State.ABORTABLE_ERROR);
+lastError = error.exception();
+epochBumpRequired = true;

Review Comment:
   @jolshan  - See the last commit.  I cleaned up the comments because 
authorization isn't a fatal error after the patch.  I also patched a test to 
demonstrate resend after fixing the permission should bring the producer back 
to normal. Also tested the epoch there to ensure it is 0, not -1 after fixing 
the permission.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2023-04-19 Thread keith.paulson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714323#comment-17714323
 ] 

keith.paulson commented on KAFKA-8154:
--

I'm getting this error now, kafka 3.4 with BouncyCastle FIPS.

The BC libraries uses a fixed 16k for buffer size, as opposed to java ssl, 
which uses a calculation to determine it.

The part that confuses me is that we use Utils.ensureCapacity to adjust 
buffers, and this uses the conditional 
{code:java}
if (newLength > existingBuffer.capacity()) {code}
But the overflow check is
{code:java}
netWriteBuffer.limit() >= currentNetWriteBufferSize {code}
The case where capacity is 16k, and newLength is the same, buffer size won't be 
changed, but then the overflow condition would be immediately hit.

Should the second test be changed to '>' ? 
There should be no terminating character, so x bytes can be written to a buffer 
of size x – or is there a reason we want an extra byte size?

 

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171873787


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-fatalError(error.exception());
+log.info("Abortable authorization error: {}.  Transition the 
producer state to {}", error.message(), State.ABORTABLE_ERROR);
+lastError = error.exception();
+epochBumpRequired = true;

Review Comment:
   even i think the initProducerId path in the sender has been tested, but it's 
probably worth adding a sender test to make sure the epoch is bumped from -1 to 
0. I think it should be an easy modification,  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171872077


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1294,7 +1309,10 @@ public void handleResponse(AbstractResponse response) {
 reenqueue();
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
-fatalError(error.exception());
+log.info("Abortable authorization error: {}.  Transition the 
producer state to {}", error.message(), State.ABORTABLE_ERROR);
+lastError = error.exception();
+epochBumpRequired = true;

Review Comment:
   Was about to ask about this and realized we removed the bump. I guess we 
could still have a test unless we think the other testing covers this end to 
end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-04-19 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1171868072


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1051,6 +1060,8 @@ class ReplicaManager(val config: KafkaConfig,
   } else {
 try {
   val partition = getPartitionOrException(topicPartition)
+  val producerId = records.firstBatch().producerId()
+  partition.compareAndSetVerificationState(producerId, 
ProducerStateEntry.VerificationState.VERIFYING, 
ProducerStateEntry.VerificationState.VERIFIED)

Review Comment:
   this will be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171855134


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -618,7 +618,8 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
  * does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
  * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
- * transactional.id is not authorized. See the exception for more 
details
+ * transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for

Review Comment:
   臘 - sorry and done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171818667


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -618,7 +618,8 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
  * does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
  * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
- * transactional.id is not authorized. See the exception for more 
details
+ * transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for

Review Comment:
   Can we remove fatal here still?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171816496


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   Thanks Philip -- I think I forgot that this was the initProducerId call -- 
so we don't really have an epoch yet.  We set to 0 after getting the producer 
ID. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-19 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1171783796


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ *  the number of subscribed members is greater than the number of 
partitions for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ *This can only be done if every member is subscribed to the same topics 
and the topics are co-partitioned.
+ *Two streams are co-partitioned if the following conditions are met:
+ *
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ *
+ *  Members should retain as much as their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given member 
subscriptions.
+ *  Generate a list of members (potentiallyUnfilledMembers) 
that have not met the minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  Add members from the potentiallyUnfilled list to the 
Unfilled list if they haven't met the total required quota i.e. 
minimum number of partitions per member + 1 (if member is designated to receive 
one of the excess partitions) 
+ *  Generate a list of unassigned partitions by calculating the difference 
between total partitions and already assigned (sticky) partitions 
+ *  Iterate through unfilled members and assign partitions from the 
unassigned partitions 
+ * 
+ * 
+ *
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
+} else {
+log.info(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-19 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1171781883


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ *  the number of subscribed members is greater than the number of 
partitions for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ *This can only be done if every member is subscribed to the same topics 
and the topics are co-partitioned.
+ *Two streams are co-partitioned if the following conditions are met:
+ *
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ *
+ *  Members should retain as much as their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given member 
subscriptions.
+ *  Generate a list of members (potentiallyUnfilledMembers) 
that have not met the minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  Add members from the potentiallyUnfilled list to the 
Unfilled list if they haven't met the total required quota i.e. 
minimum number of partitions per member + 1 (if member is designated to receive 
one of the excess partitions) 
+ *  Generate a list of unassigned partitions by calculating the difference 
between total partitions and already assigned (sticky) partitions 
+ *  Iterate through unfilled members and assign partitions from the 
unassigned partitions 
+ * 
+ * 
+ *
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
+} else {
+log.info(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private 

[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-19 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1171780417


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+savePartitionAndGenerationState();

Review Comment:
   i think the ration for resetting the generation is to ensure member's 
partitions get revoke during the onJoinPrepare, i'm trying to think is there's 
a better way to do this...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-19 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1171778657


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.

Review Comment:
   the formatting is really off without them, that's why we had to add them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-19 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1171777998


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+subscribedTopics.add(topic2Uuid);
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));

Review Comment:
   you're right, I've removed them!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-19 Thread via GitHub


dajac commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1171774551


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+savePartitionAndGenerationState();

Review Comment:
   couldn’t we just rejoin without reseting the generation? in other words, 
what is the advantage of resetting the generation here if we still want to use 
it to re-join afterwards?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1171766477


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {
+private final static int NUM_BROKERS = 1;
+
+public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+private final static MockTime MOCK_TIME = CLUSTER.time;
+private final static String STREAM_1 = "stream1";
+private final static String STREAM_2 = "stream2";
+private final static String OUTPUT = "output-";
+private Properties streamsConfig;
+private KafkaStreams streams;
+private final static Properties CONSUMER_CONFIG = new Properties();
+private final static Properties PRODUCER_CONFIG = new Properties();
+
+@BeforeAll
+public static void startCluster() throws Exception {
+CLUSTER.start();
+//Use multiple partitions to ensure distribution of keys.
+
+CLUSTER.createTopic(STREAM_1, 4, 1);
+CLUSTER.createTopic(STREAM_2, 4, 1);
+CLUSTER.createTopic(OUTPUT, 1, 1);
+
+CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
+CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws IOException {
+final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+streamsConfig = getStreamsConfig(safeTestName);
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+}
+
+@AfterEach
+public void after() throws IOException {
+if (streams != null) {
+streams.close();
+ 

[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171764467


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   FWIW: I think in the context of this ticket, we are trying to avoid 
poisoning the client when the client is unable to re-authenticate upon startup. 
The fix wants to continue to retry the request until the permission is fixed. 
So the handler should handle the epoch bump.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-19 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1171757259


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   Hey @jolshan - I actually think we don't need to manually bump the epoch 
here.  I think it is already handled by the existing logic.  Here's the 
explanation. I think initProducerId is only happening at 2 different places: 1. 
`initializeTransactions` and 2. in the sender loop for idempotent producer 
`bumpIdempotentEpochAndResetIdIfNeeded`.
   
   for 1. It will bump the epoch if the epoch != None, which means the producer 
has been initialized and needs to bump the epoch upon re-requesting the id per 
your comment
   
   for 2. it is when we first initialize a producer (so it doesn't have an id 
at the beginning), and the `InitProducerIdRequest` should bump the epoch to 0 
upon first successful attempt.  This is the case we are addressing in this PR I 
think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat merged pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2023-04-19 Thread via GitHub


jlprat merged PR #11478:
URL: https://github.com/apache/kafka/pull/11478


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2023-04-19 Thread via GitHub


jlprat commented on PR #11478:
URL: https://github.com/apache/kafka/pull/11478#issuecomment-1515213471

   To the best of my knowledge, test failures seem to be all unrelated to the 
change in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-19 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1171712519


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+savePartitionAndGenerationState();

Review Comment:
   saving the generation and partition state prior to resetting it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13606: KAFKA-14918 Only send controller RPCs to migrating ZK brokers

2023-04-19 Thread via GitHub


mumrah merged PR #13606:
URL: https://github.com/apache/kafka/pull/13606


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-04-19 Thread via GitHub


jolshan commented on PR #13607:
URL: https://github.com/apache/kafka/pull/13607#issuecomment-151533

   I believe the connect mirror failures are known -- so no issues from the 
testing front.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-19 Thread via GitHub


jeffkbkim commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1171626106


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/VersionedMetadata.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Immutable versioned metadata.
+ */
+public class VersionedMetadata {
+public static final VersionedMetadata EMPTY = new 
VersionedMetadata((short) 0, ByteBuffer.allocate(0));
+
+/**
+ * The version of the metadata encoded in {{@link 
VersionedMetadata#metadata}}.
+ */
+private final short version;
+
+/**
+ * The metadata bytes.
+ */
+private final ByteBuffer metadata;

Review Comment:
   is this is what's stored by the consumer?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/VersionedMetadata.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Immutable versioned metadata.

Review Comment:
   can we add more description on what is stored inside the metadata and how 
it's used?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/Assignment.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable assignment for a member.
+ */
+public class Assignment {
+public static final Assignment EMPTY = new Assignment(
+(byte) 0,
+Collections.emptyMap(),
+VersionedMetadata.EMPTY
+);
+
+/**
+ * The error assigned to the member.
+ */
+private final byte error;

Review Comment:
   what would be an example for an error?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to 

[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-19 Thread via GitHub


jolshan commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1171634759


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/AssignmentTest.java:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class AssignmentTest {
+
+@Test
+public void testPartitionsAndMetadataCannotBeNull() {
+assertThrows(NullPointerException.class, () -> new Assignment(
+(byte) 1,
+null,
+new VersionedMetadata(
+(short) 1,
+ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
+)
+));
+
+assertThrows(NullPointerException.class, () -> new Assignment(
+(byte) 1,
+mkAssignment(
+mkTopicAssignment(Uuid.randomUuid(), 1, 2, 3)
+),
+null
+));
+}
+
+@Test
+public void testAttributes() {
+Map> partitions = mkAssignment(
+mkTopicAssignment(Uuid.randomUuid(), 1, 2, 3)
+);
+
+VersionedMetadata metadata = new VersionedMetadata(
+(short) 1,
+ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))
+);
+
+Assignment assignment = new Assignment(
+(byte) 1,
+partitions,
+metadata
+);
+
+assertEquals((byte) 1, assignment.error());
+assertEquals(partitions, assignment.partitions());
+assertEquals(metadata, assignment.metadata());
+}
+
+@Test
+public void testFromTargetAssignmentRecord() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+List 
partitions = new ArrayList<>();
+partitions.add(new 
ConsumerGroupTargetAssignmentMemberValue.TopicPartition()
+.setTopicId(topicId1)
+.setPartitions(Arrays.asList(1, 2, 3)));
+partitions.add(new 
ConsumerGroupTargetAssignmentMemberValue.TopicPartition()
+.setTopicId(topicId2)
+.setPartitions(Arrays.asList(4, 5, 6)));
+
+ConsumerGroupTargetAssignmentMemberValue record = new 
ConsumerGroupTargetAssignmentMemberValue()
+.setError((byte) 1)
+.setTopicPartitions(partitions)
+.setMetadataVersion((short) 2)
+.setMetadataBytes("foo".getBytes(StandardCharsets.UTF_8));
+
+Assignment assignment = Assignment.fromRecord(record);
+
+assertEquals((short) 1, assignment.error());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)
+), assignment.partitions());
+assertEquals(new VersionedMetadata(
+(short) 2,
+ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))
+), assignment.metadata());
+}
+
+@Test
+public void testEquals() {

Review Comment:
   Did you want to have a notEquals for some of these testEquals?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13614: KAFKA-14586: Adding redirection for StreamsResetter

2023-04-19 Thread via GitHub


mimaison commented on PR #13614:
URL: https://github.com/apache/kafka/pull/13614#issuecomment-1515084652

   Not merging immediately to let @mjsax a chance to review.
   Feel free to merge to trunk and 3.5, otherwise I'll do it when I come online 
tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-4327) Move Reset Tool from core to streams

2023-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4327.

Fix Version/s: (was: 4.0.0)
   Resolution: Fixed

This was resolved via https://issues.apache.org/jira/browse/KAFKA-14586.

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: kip
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
>  {{"Use 'kafka.tools.StreamsResetter' tool"}}
>  -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility – not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.
> KIP-756: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-756%3A+Move+StreamsResetter+tool+outside+of+core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-19 Thread via GitHub


jolshan commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1171629591


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/Assignment.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable assignment for a member.
+ */
+public class Assignment {
+public static final Assignment EMPTY = new Assignment(
+(byte) 0,
+Collections.emptyMap(),
+VersionedMetadata.EMPTY
+);
+
+/**
+ * The error assigned to the member.
+ */
+private final byte error;
+
+/**
+ * The partitions assigned to the member.
+ */
+private final Map> partitions;
+
+/**
+ * The metadata assigned to the member.
+ */
+private final VersionedMetadata metadata;
+
+public Assignment(
+Map> partitions
+) {
+this(
+(byte) 0,
+partitions,
+VersionedMetadata.EMPTY
+);
+}
+
+public Assignment(
+byte error,
+Map> partitions,
+VersionedMetadata metadata
+) {
+this.error = error;
+this.partitions = 
Collections.unmodifiableMap(Objects.requireNonNull(partitions));

Review Comment:
   Are we concerned about the contents of the map being null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-4327) Move Reset Tool from core to streams

2023-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4327:
--

Assignee: (was: Jorge Esteban Quilcate Otoya)

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
>  {{"Use 'kafka.tools.StreamsResetter' tool"}}
>  -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility – not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.
> KIP-756: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-756%3A+Move+StreamsResetter+tool+outside+of+core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-19 Thread via GitHub


jolshan commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1171628212


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ClientAssignor.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * An immutable representation of a client side assignor within a consumer 
group member.
+ */
+public class ClientAssignor {
+/**
+ * The name of the assignor.
+ */
+private final String name;
+
+/**
+ * The reason reported by the assignor.
+ */
+private final byte reason;
+
+/**
+ * The minimum metadata version supported by the assignor.
+ */
+private final short minimumVersion;
+
+/**
+ * The maximum metadata version supported by the assignor.
+ */
+private final short maximumVersion;
+
+/**
+ * The versioned metadata.
+ */
+private final VersionedMetadata metadata;
+
+public ClientAssignor(
+String name,
+byte reason,
+short minimumVersion,
+short maximumVersion,
+VersionedMetadata metadata
+) {
+this.name = Objects.requireNonNull(name);
+this.reason = reason;
+this.minimumVersion = minimumVersion;

Review Comment:
   for some of the version fields (maybe errors too) did we want to specify 
non-negative? It's probably not as crucial as specifying non-null though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-19 Thread via GitHub


jolshan commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1171626917


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+
+import java.util.Objects;
+
+public class TopicMetadata {

Review Comment:
   Did we want a describing comment? I think this is fairly self-explanatory, 
but all the other classes had them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-19 Thread via GitHub


jolshan commented on code in PR #13537:
URL: https://github.com/apache/kafka/pull/13537#discussion_r1171626201


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+
+import java.util.Objects;
+
+public class TopicMetadata {
+/**
+ * The topic id.
+ */
+private final Uuid id;
+
+/**
+ * The topic name.
+ */
+private final String name;
+
+/**
+ * The number of partitions.
+ */
+private final int numPartitions;
+
+public TopicMetadata(
+Uuid id,
+String name,
+int numPartitions
+) {
+this.id = Objects.requireNonNull(id);

Review Comment:
   Do we want to allow the Uuid to be the zero uuid?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14868) Remove some forgotten metrics when the replicaManager is closed

2023-04-19 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14868:
---
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Remove some forgotten metrics when the replicaManager is closed
> ---
>
> Key: KAFKA-14868
> URL: https://issues.apache.org/jira/browse/KAFKA-14868
> Project: Kafka
>  Issue Type: Improvement
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
> Fix For: 3.6.0
>
>
> In some current classes, some metrics are always registered when loading, but 
> it often occurs that some or all metrics are forgotten to be removed when 
> closing. For example, in the ReplicaManager class, IsrExpandsPerSec, 
> IsrShrinksPerSec, and FailedIsrUpdatesPerSec are all removed by omission.
> The problem of not closing the metric in the replicaManager has been solved 
> in the patch. As for how to improve the KafkaMetricsGroup to make the 
> inspection of such issues more general, there is no conclusive solution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison merged pull request #13471: KAFKA-14868:Remove some forgotten metrics when the replicaManager is closed

2023-04-19 Thread via GitHub


mimaison merged PR #13471:
URL: https://github.com/apache/kafka/pull/13471


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-04-19 Thread via GitHub


divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1515034487

   @ijuma I tried end to end benchmarks w/ and w/o this change but I didn't 
observe any noticeable change in end to end latency or throughput (because the 
bottle neck in end to end tests are someplace else). Hence, I can't prove 
impact of this change beyond microbenchmarking for different byte sizes. I have 
reverted the change for long. Please let me know if you are willing to merge 
this for varint32 based on microbenchmark numbers. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14669) Include MirrorMaker connector configurations in docs

2023-04-19 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge reassigned KAFKA-14669:
-

Assignee: Gantigmaa Selenge

> Include MirrorMaker connector configurations in docs
> 
>
> Key: KAFKA-14669
> URL: https://issues.apache.org/jira/browse/KAFKA-14669
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
>
> In the https://kafka.apache.org/documentation/#georeplication-flow-configure 
> section we list some of the MirrorMaker connectors configurations. These are 
> hardcoded in the docs: 
> https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788
> Instead we should used the generated docs (added as part of 
> https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78)
>  like we do for the file connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-19 Thread via GitHub


dajac commented on PR #13537:
URL: https://github.com/apache/kafka/pull/13537#issuecomment-1515007003

   @dengziming @showuon I have a few PRs in-flight for KIP-848 if you are 
interested to help with reviews on that front.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-19 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1171513542


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan I hadn't considered using the source offsets, and this is worth 
discussing more in a follow-up, as I think the current implementation with its 
limitations is all that we can afford for the upcoming release.
   
   Compared to re-reading the checkpoints topic, the advantages of adding 
checkpoint source offsets are:
   1. We can use the source offsets consumer in the framework
   2. The framework will compact the data for us.
   
   The disadvantages are:
   1. Deletion of the offset syncs loses monotonicity guarantees
   2. In non-EOS mode we lose monotonicity guarantees (source offsets arrive 
after checkpoints are written)
   3. New load to the global source offsets topic that was not there before, 
hosting duplicate data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dimitarndimitrov commented on pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-19 Thread via GitHub


dimitarndimitrov commented on PR #13432:
URL: https://github.com/apache/kafka/pull/13432#issuecomment-1514934648

   > please also change the description to include the modifications you made
   
   @dengziming I have updated the PR description a bit and added a "review 
modifications" section. I assume I shouldn't squash the commits now to preserve 
the commit history in the PR.
   Let me know if there's anything else I need to do regarding that.
   
   I'm now looking at the latest PR CI results.
   There seems to be a significant number of test failures, a big increase over 
the results from previous commits. I suspect they are related to the recent 
mailing list thread about test failures but am still verifying that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dimitarndimitrov commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-19 Thread via GitHub


dimitarndimitrov commented on code in PR #13432:
URL: https://github.com/apache/kafka/pull/13432#discussion_r1171490648


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class ListOffsetsHandler extends Batched {
+
+private final Map offsetTimestampsByPartition;
+private final ListOffsetsOptions options;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public ListOffsetsHandler(
+Map offsetTimestampsByPartition,
+ListOffsetsOptions options,
+LogContext logContext
+) {
+this.offsetTimestampsByPartition = offsetTimestampsByPartition;
+this.options = options;
+this.log = logContext.logger(ListOffsetsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "listOffsets";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+@Override
+ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map topicsByName = 
CollectionUtils.groupPartitionsByTopic(
+keys,
+topicName -> new ListOffsetsTopic().setName(topicName),
+(listOffsetsTopic, partitionId) -> {
+TopicPartition topicPartition = new 
TopicPartition(listOffsetsTopic.name(), partitionId);
+long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
+listOffsetsTopic.partitions().add(
+new ListOffsetsPartition()
+.setPartitionIndex(partitionId)
+.setTimestamp(offsetTimestamp));
+});
+boolean supportsMaxTimestamp = keys
+.stream()
+.anyMatch(key -> offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.MAX_TIMESTAMP);
+
+return ListOffsetsRequest.Builder
+.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp)
+.setTargetTimes(new ArrayList<>(topicsByName.values()));
+}
+
+@Override
+public ApiResult handleResponse(
+Node broker,
+Set keys,
+AbstractResponse abstractResponse
+) {
+ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse;
+Map completed 

[jira] [Commented] (KAFKA-14586) Move StreamsResetter to tools

2023-04-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714140#comment-17714140
 ] 

Sagar Rao commented on KAFKA-14586:
---

Thanks [~fvaleri] , I created the PR. 

> Move StreamsResetter to tools
> -
>
> Key: KAFKA-14586
> URL: https://issues.apache.org/jira/browse/KAFKA-14586
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 opened a new pull request, #13614: KAFKA-14586: Adding redirection for StreamsResetter

2023-04-19 Thread via GitHub


vamossagar12 opened a new pull request, #13614:
URL: https://github.com/apache/kafka/pull/13614

   Adding Redirection for StreamsResetter. The erstwhile StreamsResetter 
resided in core/src/main/scala/kafka/tools package and that's where I added 
this class.
   I tried to run the main method but it fails with 
   
   ```
   Exception in thread "main" java.lang.ClassNotFoundException: 
org.apache.kafka.tools.StreamsResetter
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at kafka.tools.StreamsResetter.main(StreamsResetter.java:28)
   ``` 
   
   but I created another class and invoked the class in this PR using the same 
way and it seemed to invoke this. 
   
   cc @mjsax , @fvaleri , @mimaison 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14922) kafka-streams-application-reset deletes topics not belonging to specified application-id

2023-04-19 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jørgen updated KAFKA-14922:
---
Description: 
Slack-thread: 
[https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]

When running the command _kafka-streams-application-reset --bootstrap-servers 
$BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo is 
deleted. This happens even if there's no application-id named foo.

Example:
{code:java}
Application IDs:
foo-v1
foo-v2

Internal topics:
foo-v1-repartition-topic-repartition
foo-v2-repartition-topic-repartition 

Application reset:
kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP --application-id 
foo
> No input or intermediate topics specified. Skipping seek.
Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
foo-v1-repartition-topic-repartition]
Done.{code}
Expected behaviour is that the command fails as there are no application-id's 
with the name foo instead of deleting all foo* topics. 

This is critical on typos or if application-ids starts with the same name as 
others (for example if we had foo-v21 and wanted to reset foo-v2)

The bug should be located here: 
[https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]

Should check that the topics matches the application-id exactly instead of 
checking that it starts with the application-id.

  was:
Slack-thread: 
[https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]

When running the command _kafka-streams-application-reset --bootstrap-servers 
$BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo is 
deleted. This happens even if there's no application-id named foo.

Example:
{code:java}
Application IDs:
foo-v1
foo-v2

Internal topics:
foo-v1-repartition-topic-repartition
foo-v2-repartition-topic-repartition 

Application reset:
kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP --application-id 
foo
> No input or intermediate topics specified. Skipping seek.
Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
foo-v1-repartition-topic-repartition]
Done.{code}
Expected behaviour is that the command fails as there are no application-id's 
with the name foo instead of deleting all foo* topics. 

This is critical on typos or if application-ids starts with the same name as 
others (for example if we had foo-v21 and wanted to reset foo-v2)

The bug should be located here: 
[https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]

Should check that the topics matches the application-id exactly instead of 
checking that it starts.


> kafka-streams-application-reset deletes topics not belonging to specified 
> application-id
> 
>
> Key: KAFKA-14922
> URL: https://issues.apache.org/jira/browse/KAFKA-14922
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: Jørgen
>Priority: Major
>
> Slack-thread: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]
> When running the command _kafka-streams-application-reset --bootstrap-servers 
> $BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo 
> is deleted. This happens even if there's no application-id named foo.
> Example:
> {code:java}
> Application IDs:
> foo-v1
> foo-v2
> Internal topics:
> foo-v1-repartition-topic-repartition
> foo-v2-repartition-topic-repartition 
> Application reset:
> kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP 
> --application-id foo
> > No input or intermediate topics specified. Skipping seek.
> Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
> foo-v1-repartition-topic-repartition]
> Done.{code}
> Expected behaviour is that the command fails as there are no application-id's 
> with the name foo instead of deleting all foo* topics. 
> This is critical on typos or if application-ids starts with the same name as 
> others (for example if we had foo-v21 and wanted to reset foo-v2)
> The bug should be located here: 
> [https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]
> Should check that the topics matches the application-id exactly instead of 
> checking that it starts with the application-id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14922) kafka-streams-application-reset deletes topics not belonging to specified application-id

2023-04-19 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jørgen updated KAFKA-14922:
---
Description: 
Slack-thread: 
[https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]

When running the command _kafka-streams-application-reset --bootstrap-servers 
$BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo is 
deleted. This happens even if there's no application-id named foo.

Example:
{code:java}
Application IDs:
foo-v1
foo-v2

Internal topics:
foo-v1-repartition-topic-repartition
foo-v2-repartition-topic-repartition 

Application reset:
kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP --application-id 
foo
> No input or intermediate topics specified. Skipping seek.
Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
foo-v1-repartition-topic-repartition]
Done.{code}
Expected behaviour is that the command fails as there are no application-id's 
with the name foo instead of deleting all foo* topics. 

This is critical on typos or if application-ids starts with the same name as 
others (for example if we had foo-v21 and wanted to reset foo-v2)

The bug should be located here: 
[https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]

Should check that the topics matches the application-id exactly instead of 
checking that it starts.

  was:
Slack-thread: 
[https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]

When running the command _kafka-streams-application-reset --bootstrap-servers 
$BOOTSTRAP --application-id foo_ all changelog and repartition topics that 
_starts with_ foo is deleted. This happens even if there's no application-id 
named foo.

Example:
{code:java}
Application IDs:
foo-v1
foo-v2

Internal topics:
foo-v1-repartition-topic-repartition
foo-v2-repartition-topic-repartition 

Application reset:
kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP --application-id 
foo
> No input or intermediate topics specified. Skipping seek.
Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
foo-v1-repartition-topic-repartition]
Done.{code}
Expected behaviour is that the command fails as there are no application-id's 
with the name foo instead of deleting all foo* topics. 

This is critical on typos or if application-ids starts with the same name as 
others.

The bug should be located here: 
[https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]

Should check that the topics matches the application-id exactly instead of 
checking that it starts.


> kafka-streams-application-reset deletes topics not belonging to specified 
> application-id
> 
>
> Key: KAFKA-14922
> URL: https://issues.apache.org/jira/browse/KAFKA-14922
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: Jørgen
>Priority: Major
>
> Slack-thread: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]
> When running the command _kafka-streams-application-reset --bootstrap-servers 
> $BOOTSTRAP --application-id foo_ all internal topics that _starts with_ foo 
> is deleted. This happens even if there's no application-id named foo.
> Example:
> {code:java}
> Application IDs:
> foo-v1
> foo-v2
> Internal topics:
> foo-v1-repartition-topic-repartition
> foo-v2-repartition-topic-repartition 
> Application reset:
> kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP 
> --application-id foo
> > No input or intermediate topics specified. Skipping seek.
> Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
> foo-v1-repartition-topic-repartition]
> Done.{code}
> Expected behaviour is that the command fails as there are no application-id's 
> with the name foo instead of deleting all foo* topics. 
> This is critical on typos or if application-ids starts with the same name as 
> others (for example if we had foo-v21 and wanted to reset foo-v2)
> The bug should be located here: 
> [https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]
> Should check that the topics matches the application-id exactly instead of 
> checking that it starts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14922) kafka-streams-application-reset deletes topics not belonging to specified application-id

2023-04-19 Thread Jira
Jørgen created KAFKA-14922:
--

 Summary: kafka-streams-application-reset deletes topics not 
belonging to specified application-id
 Key: KAFKA-14922
 URL: https://issues.apache.org/jira/browse/KAFKA-14922
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.4.0
Reporter: Jørgen


Slack-thread: 
[https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1681908267206849]

When running the command _kafka-streams-application-reset --bootstrap-servers 
$BOOTSTRAP --application-id foo_ all changelog and repartition topics that 
_starts with_ foo is deleted. This happens even if there's no application-id 
named foo.

Example:
{code:java}
Application IDs:
foo-v1
foo-v2

Internal topics:
foo-v1-repartition-topic-repartition
foo-v2-repartition-topic-repartition 

Application reset:
kafka-streams-application-reset --bootstrap-servers $BOOTSTRAP --application-id 
foo
> No input or intermediate topics specified. Skipping seek.
Deleting inferred internal topics [foo-v2-repartition-topic-repartition, 
foo-v1-repartition-topic-repartition]
Done.{code}
Expected behaviour is that the command fails as there are no application-id's 
with the name foo instead of deleting all foo* topics. 

This is critical on typos or if application-ids starts with the same name as 
others.

The bug should be located here: 
[https://github.com/apache/kafka/blob/c14f56b48461f01743146d58987bc8661ba0d459/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java#L693]

Should check that the topics matches the application-id exactly instead of 
checking that it starts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14084) Support SCRAM when using KRaft mode

2023-04-19 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714103#comment-17714103
 ] 

Proven Provenzano commented on KAFKA-14084:
---

PR added for migrating SCRAM records during ZK to KRaft migration. I don't have 
the dual write support it the PR. I could add it in probably by EOD tomorrow.

> Support SCRAM when using KRaft mode
> ---
>
> Key: KAFKA-14084
> URL: https://issues.apache.org/jira/browse/KAFKA-14084
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>
> Support SCRAM when using KRaft mode, as specified in KIP-631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] pprovenzano opened a new pull request, #13613: KAFKA-14859: SCRAM ZK to KRaft migration without dual write

2023-04-19 Thread via GitHub


pprovenzano opened a new pull request, #13613:
URL: https://github.com/apache/kafka/pull/13613

   Handle migrating SCRAM records in ZK when migrating from ZK to KRaft.
   
   This does not allow the user to change SCRAM records while migration is 
happening.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14807) MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the pause of replication of consumer groups

2023-04-19 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714097#comment-17714097
 ] 

Daniel Urban commented on KAFKA-14807:
--

[~fisher91] do you use MM2 dedicated mode, or use the Connectors directly in a 
Connect cluster?

If the latter, you can fix this by only passing 
source.consumer.auto.offset.reset=latest to MirrorSourceConnector, but not to 
the MirrorCheckpointConnector.

For MM2 dedicated mode, I'm not aware of any workarounds.

> MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the 
> pause of replication of consumer groups
> ---
>
> Key: KAFKA-14807
> URL: https://issues.apache.org/jira/browse/KAFKA-14807
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.0, 3.3.1, 3.3.2
> Environment: centos7
>Reporter: Zhaoli
>Priority: Major
>
> We use MirrorMaker2 to replicate messages and consumer group offsets from the 
> Kafka cluster `source` to cluster `target`.
> To reduce the load on the source cluster, we add this configuration to mm2 to 
> avoid replicating the whole history messages:
> {code:java}
> source.consumer.auto.offset.reset=latest {code}
> After that, we found part of the consumer group offsets had stopped 
> replicating.
> The common characteristic of these consumer groups is their EMPTY status, 
> which means they have no active members at that moment. All the active 
> consumer groups‘ offset replication work as normal.
> After researching the source code, we found this is because the configuration 
> above also affects the consumption of topic `mm2-offset-syncs`, therefore the 
> map `offsetSyncs` doesn't hold the whole topic partitions:
> {code:java}
> private final Map offsetSyncs = new HashMap<>(); 
> {code}
> And the lost topicPartitions lead to the pause of replication of the EMPTY 
> consumer groups, which is not expected.
> {code:java}
> OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
> upstreamOffset) {
> Optional offsetSync = latestOffsetSync(sourceTopicPartition);
> if (offsetSync.isPresent()) {
> if (offsetSync.get().upstreamOffset() > upstreamOffset) {
> // Offset is too far in the past to translate accurately
> return OptionalLong.of(-1L);
> }
> long upstreamStep = upstreamOffset - 
> offsetSync.get().upstreamOffset();
> return OptionalLong.of(offsetSync.get().downstreamOffset() + 
> upstreamStep);
> } else {
> return OptionalLong.empty();
> }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] urbandan commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-19 Thread via GitHub


urbandan commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1171304080


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   One idea to this:
   What if the checkpointing started using the source offset mechanism of 
Connect to keep track of the last offset-sync used for a specific consumer 
group?
   MirrorCheckpointTask could start emitting a source offset record like
   (group, topic, partition) -> (offsetOfLastUsedOffsetSync)
   
   This would allow
   1. The OffsetSyncStore to read compaction-eligible offset syncs at startup, 
since it will be able to detect that (going with the example from @gharris1727) 
it cannot use offset-sync B, as the last saved offset of the offset-sync topic 
was C (which has a higher offset than B).
   2. To rewind consumer group offsets if the source topic was recreated 
(offset sync record offsets will keep increasing regardless of the reset in the 
upstream offset)
   
   I understand that in complexity, this is pretty much the same as reading 
back checkpoints, but in terms of implementation, it should be simpler (?). One 
drawback is that the deletion of the offset-syncs topic messes up the offset 
translation logic - but I guess with KIP-875 (or with the followup of that KIP 
containing the "reset" functionality) that won't be a problem, as the 
checkpoint source offsets can be cleared to restore a clean state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-19 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1171245421


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int updatedFetchSize =

Review Comment:
   Good point. There is no risk here but it is good to be consistent with the 
local read pattern to return empty records for that case, will update with the 
changes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-19 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1171250580


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig,
 responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
   ): Unit = {
 // check if this fetch request can be satisfied right away
-val logReadResults = readFromLocalLog(params, fetchInfos, quota, 
readFromPurgatory = false)
+val logReadResults = readFromLog(params, fetchInfos, quota, 
readFromPurgatory = false)
 var bytesReadable: Long = 0
 var errorReadingData = false
+
+// The 1st topic-partition that has to be read from remote storage
+var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()

Review Comment:
   As I already called out in this PR description, that it is followed up with 
a PR. We will describe the config on different options with respective 
scenarios. The default value will be to fetch from multiple partitions as it 
does with local log segments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >