[GitHub] [kafka] jsancio commented on a change in pull request #11416: MINOR: Improve createTopics and incrementalAlterConfigs in KRaft

2021-11-05 Thread GitBox


jsancio commented on a change in pull request #11416:
URL: https://github.com/apache/kafka/pull/11416#discussion_r744007754



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
##
@@ -72,6 +73,14 @@
 define("ghi", ConfigDef.Type.BOOLEAN, true, 
ConfigDef.Importance.HIGH, "ghi"));
 }
 
+static class ExampleConfigurationValidator implements 
ConfigurationValidator {
+static final ExampleConfigurationValidator INSTANCE = new 
ExampleConfigurationValidator();
+
+@Override
+public void validate(ConfigResource resource, Map 
config) {
+}

Review comment:
   Did you plan to implement this? If you not, you can use the trick you 
are doing in other places. E.g.
   ```java
   static ConfigurationValidator NOOP_VALIDATOR = (_, __) -> {};
   ```

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -215,6 +216,11 @@ public Builder 
setAlterConfigPolicy(Optional alterConfigPolic
 return this;
 }
 
+public Builder setConfigurationValidator(ConfigurationValidator 
configurationValidator) {
+this.configurationValidator = configurationValidator;
+return this;
+}

Review comment:
   This method is never called in this PR. Are we missing changes?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -388,7 +389,12 @@ public void replay(RemoveTopicRecord record) {
 Map successes = new HashMap<>();
 for (CreatableTopic topic : request.topics()) {
 if (topicErrors.containsKey(topic.name())) continue;
-ApiError error = createTopic(topic, records, successes);
+ApiError error;
+try {
+error = createTopic(topic, records, successes);

Review comment:
   I find it strange that `createTopic` can return an `ApiError` or throw 
an `ApiException`. I think it should do one or the other but not both. What is 
now throwing that requires a `try ... catch ...`?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.ConfigResource;
+
+import java.util.Map;
+
+
+public interface ConfigurationValidator {
+/**
+ * Throws an ApiException if a configuration is invalid for the given 
resource.
+ *
+ * @param resource  The configuration resource.
+ * @param configThe new configuration.
+ */
+void validate(ConfigResource resource, Map config);

Review comment:
   I couldn't find a class in `src/main` that implements this interface. 
Are we missing files in this PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-11-05 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439526#comment-17439526
 ] 

David Mao edited comment on KAFKA-13388 at 11/5/21, 10:20 PM:
--

[~dhofftgt]

Why do we expect a connection in CHECKING_API_VERSIONS to have in-flight 
requests?

I would expect the opposite: 

if a connection is in CHECKING_API_VERSIONS, it will *not* be ready for 
requests (at this point, the client does not know what API versions the broker 
supports, so it can't serialize requests to the appropriate version), so it 
should not have any inflight requests.


was (Author: david.mao):
[~dhofftgt]

Why do we expect a connection in CHECKING_API_VERSIONS to have in-flight 
requests?

I would expect the opposite: 

if a connection is in CHECKING_API_VERSIONS, it will *not* be ready for 
requests (at this point, the client does not know what API versions the broker 
supports, so it can't serialize requests to the appropriate version).

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-11-05 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439526#comment-17439526
 ] 

David Mao commented on KAFKA-13388:
---

[~dhofftgt]

Why do we expect a connection in CHECKING_API_VERSIONS to have in-flight 
requests?

I would expect the opposite: 

if a connection is in CHECKING_API_VERSIONS, it will *not* be ready for 
requests (at this point, the client does not know what API versions the broker 
supports, so it can't serialize requests to the appropriate version).

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #11457: MINOR: guard against calls to exit in QuorumTestHarness tests

2021-11-05 Thread GitBox


jsancio commented on a change in pull request #11457:
URL: https://github.com/apache/kafka/pull/11457#discussion_r744002667



##
File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
##
@@ -163,6 +163,14 @@ abstract class QuorumTestHarness extends Logging {
   // That way you control the initialization order.
   @BeforeEach
   def setUp(testInfo: TestInfo): Unit = {
+Exit.setExitProcedure((code, message) => {
+  error(s"exit(${code}, ${message}) called!")
+  tearDown()
+})
+Exit.setHaltProcedure((code, message) => {
+  error(s"halt(${code}, ${message}) called!")
+  tearDown()

Review comment:
   I think this implementation for `setExitProcedure` and 
`setHaltProcedure` would allow the test that called `Exit.exit` and `Exit.halt` 
to continue. This is probably not what the user intended when they call those 
methods. If you agree, how about throwing an unchecked exception. Maybe 
`AssertionError`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743997732



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val topicIds = new util.HashMap[String, Uuid]()
-val topicNames = new util.HashMap[Uuid, String]()
+val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava

Review comment:
   Nice. This works well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743983833



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   Sorry I'm still a bit confused. The request is sent in this method. We 
don't get access to the request. We have access to the data that is tested in 
FetchSessionHandler and that is passed into this method where the request is 
built and sent. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743981159



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   Ah I'm already doing this.   Ok. sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743979130



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
 // Only add topic IDs to the session if we are using topic IDs.
 if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+Map> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet(;
+
+sessionTopicNames = new HashMap<>(newTopicNames.size());
+// There should only be one topic name per topic ID.
+newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
 } else {
-sessionTopicIds = new HashMap<>();
-sessionTopicNames = new HashMap<>();
+sessionTopicNames = Collections.emptyMap();
 }
-topicIds = null;
 Map toSend =
-Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
-Map toSendTopicIds =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicIds));
-Map toSendTopicNames =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicNames));
-return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
+return new FetchRequestData(toSend, Collections.emptyList(), 
Collections.emptyList(), toSend, nextMetadata, canUseTopicIds);
 }
 
-List added = new ArrayList<>();
-List removed = new ArrayList<>();
-List altered = new ArrayList<>();
+List added = new ArrayList<>();
+List removed = new ArrayList<>();
+List altered = new ArrayList<>();
+List replaced = new ArrayList<>();
 for (Iterator> iter =
- sessionPartitions.entrySet().iterator(); iter.hasNext(); 
) {
+ sessionPartitions.entrySet().iterator(); iter.hasNext(); ) {
 Entry entry = iter.next();
 TopicPartition topicPartition = entry.getKey();
 PartitionData prevData = entry.getValue();
 PartitionData nextData = next.remove(topicPartition);
 if (nextData != null) {
-if (!prevData.equals(nextData)) {
+// We basically check if the new partition had the same 
topic ID. If not,
+// we add it to the "replaced" set.
+if (!prevData.topicId.equals(nextData.topicId) && 
!prevData.topicId.equals(Uuid.ZERO_UUID)) {
+// Re-add the replaced partition to the end of 'next'
+next.put(topicPartition, nextData);
+entry.setValue(nextData);
+replaced.add(new TopicIdPartition(prevData.topicId, 
topicPartition));
+} else if (!prevData.equals(nextData)) {
 // Re-add the altered partition to the end of 'next'
 next.put(topicPartition, nextData);
 entry.setValue(nextData);
-altered.add(topicPartition);
+altered.add(new TopicIdPartition(nextData.topicId, 
topicPartition));
 }
 } else {
 // Remove this partition from the session.
 iter.remove();
 // Indicate that we no longer want to listen to this 
partition.
-removed.add(topicPartition);
+removed.add(new TopicIdPartition(prevData.topicId, 
topicPartition));
 // If we do not have this topic ID in the builder or the 
session, we can not use topic IDs.
-if 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743977547



##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -305,9 +304,10 @@ class ReplicaFetcherThread(name: String,
 } else {
   val version: Short = if (fetchRequestVersion >= 13 && 
!fetchData.canUseTopicIds) 12 else fetchRequestVersion
   val requestBuilder = FetchRequest.Builder
-.forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend, 
fetchData.topicIds)
+.forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend)
 .setMaxBytes(maxBytes)
-.toForget(fetchData.toForget)
+.removed(fetchData.toForget)
+.replaced(fetchData.toReplace)

Review comment:
   I think I have the same confusion here as I do for the fetcher tests. I 
agree that changes should be tested, but I'm not really sure how to do this 
here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743976846



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -413,8 +413,20 @@ abstract class AbstractFetcherThread(name: String,
 
 case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
   warn(s"Received ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from 
the leader for partition $topicPartition. " +
-   "This error may be returned transiently when the 
partition is being created or deleted, but it is not " +
-   "expected to persist.")
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")
+  partitionsWithError += topicPartition
+
+case Errors.UNKNOWN_TOPIC_ID =>
+  warn(s"Received ${Errors.UNKNOWN_TOPIC_ID} from the leader 
for partition $topicPartition. " +
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")
+  partitionsWithError += topicPartition
+
+case Errors.INCONSISTENT_TOPIC_ID =>
+  warn(s"Received ${Errors.INCONSISTENT_TOPIC_ID} from the 
leader for partition $topicPartition. " +
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")

Review comment:
   I don't think processFetchRequest is tested anywhere. There tests for 
the much higher level method doWork, so I can try to write one like that and 
check if there is that partition with error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743973792



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());

Review comment:
   So #11459 doesn't touch the FetchSessionHandler code. But I can still 
add these cases.




-- 
This is an automated message from the 

[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743973388



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] kirktrue commented on pull request #11465: OAuth updates 1

2021-11-05 Thread GitBox


kirktrue commented on pull request #11465:
URL: https://github.com/apache/kafka/pull/11465#issuecomment-962191924


   @junrao - perhaps you might have a chance to look at this? It's basically 
some clean up from the main OAuth merge.
   
   Not sure if this should go directly to `trunk` or the 3.1 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743949497



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   Ok, so we'll pass a name and the reqData in that method.

##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -794,22 +793,23 @@ class ReplicaManagerTest {
 
   // We receive one valid request from the follower and replica state is 
updated
   var successfulFetch: Option[FetchPartitionData] = None
-  def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit 
= {
-successfulFetch = response.headOption.filter { case (topicPartition, 
_) => topicPartition == tp }.map { case (_, data) => data }
+  def callback(response: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+// Check the topic partition only since we are reusing this callback 
on different TopicIdPartitions.
+successfulFetch = response.headOption.filter { case (topicIdPartition, 
_) => topicIdPartition.topicPartition == tidp.topicPartition }.map { case (_, 
data) => data }

Review comment:
   So I can write a separate callback for each one that checks the ID.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11345: Allow empty last segment to have missing offset index during recovery

2021-11-05 Thread GitBox


ccding commented on pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#issuecomment-961961648


   @junrao I think this should solve the issue. Please take a look
   cc @kowshik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r743682843



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +692,22 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
+if (future == null)
+onJoinPrepareAsyncCommitSucceeded = true;
+else {
+if (future.succeeded()) {
+onJoinPrepareAsyncCommitSucceeded = true;
+} else if (future.failed() && !future.isRetriable()) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Asynchronous auto-commit offsets failed: {}", 
future.exception().getMessage());
+onJoinPrepareAsyncCommitSucceeded = true;
+}

Review comment:
   sure

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final 
Map

[GitHub] [kafka] RivenSun2 commented on pull request #11461: KAFKA-13422: Add verification of duplicate configuration for each type of LoginModule in JaasConfigFile

2021-11-05 Thread GitBox


RivenSun2 commented on pull request #11461:
URL: https://github.com/apache/kafka/pull/11461#issuecomment-961575399


   @guozhangwang @ijuma 
   please help check this PR when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor

2021-11-05 Thread GitBox


dajac commented on pull request #11429:
URL: https://github.com/apache/kafka/pull/11429#issuecomment-961343546


   @ijuma Should we pick this one to the 3.1 branch as well? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


RivenSun2 commented on pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#issuecomment-961940047


   @showuon  Thanks again.
   please browse it when available.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #11430: KAFKA-13352: Kafka Client does not support passwords starting with number in jaas config

2021-11-05 Thread GitBox


dongjinleekr commented on pull request #11430:
URL: https://github.com/apache/kafka/pull/11430#issuecomment-961934705


   @rajinisivaram Here is the fix; I allowed the asterisks following the base 
implementation and commented on how to mix numbers & the other symbols in the 
string. (oh, updating the Jira would also be needed. Isn't it?)
   
   I also tried to find a way to assert it works identically with the base 
implementation, but failed. Instead, I just added additional tests only.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-05 Thread GitBox


showuon commented on pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#issuecomment-961821207


   No, only committers can merge the PR.  @hachikuji, could you help check this 
PR? Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-05 Thread GitBox


dajac commented on pull request #11467:
URL: https://github.com/apache/kafka/pull/11467#issuecomment-961343975


   @showuon Thanks. I will take a look soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743574922



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] RivenSun2 commented on pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-05 Thread GitBox


RivenSun2 commented on pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#issuecomment-961805853


   @showuon , can you help to merge this PR?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r743635124



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +692,22 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
+if (future == null)
+onJoinPrepareAsyncCommitSucceeded = true;
+else {
+if (future.succeeded()) {
+onJoinPrepareAsyncCommitSucceeded = true;
+} else if (future.failed() && !future.isRetriable()) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Asynchronous auto-commit offsets failed: {}", 
future.exception().getMessage());
+onJoinPrepareAsyncCommitSucceeded = true;
+}

Review comment:
   I think we should also log the error in `failed && isRetriable()` case

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final 
Map

[GitHub] [kafka] showuon commented on a change in pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-05 Thread GitBox


showuon commented on a change in pull request #11467:
URL: https://github.com/apache/kafka/pull/11467#discussion_r742808430



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -100,8 +100,6 @@
  * {@code
  * Properties props = new Properties();
  * props.put("bootstrap.servers", "localhost:9092");
- * props.put("acks", "all");
- * props.put("retries", 0);

Review comment:
   In V3.0, `acks` is default to `all` already. And `enable.idempotence` is 
set to true in V3.0, which means `retires` can not set to 0 now.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -117,14 +115,15 @@
  * as well as a background I/O thread that is responsible for turning these 
records into requests and transmitting them
  * to the cluster. Failure to close the producer after use will leak these 
resources.
  * 
- * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called it adds the record to a buffer of pending record sends
+ * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called, it adds the record to a buffer of pending sending record
  * and immediately returns. This allows the producer to batch together 
individual records for efficiency.
  * 
  * The acks config controls the criteria under which requests are 
considered complete. The "all" setting
  * we have specified will result in blocking on the full commit of the record, 
the slowest but most durable setting.
  * 
- * If the request fails, the producer can automatically retry, though since we 
have specified retries
- * as 0 it won't. Enabling retries also opens up the possibility of duplicates 
(see the documentation on
+ * If the request fails, the producer can automatically retry. We have 
specified retries default as Integer.MAX_VALUE.
+ * It's recommended to use delivery.timeout.ms to control retry 
behavior, instead of retries.
+ * Enabling retries also opens up the possibility of duplicates (see the 
documentation on

Review comment:
   After 
[KIP-91](https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer),
 we set `retries` as max value, and it's recommended to use 
`delivery.timeout.ms` to control retry behavior. Update it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

2021-11-05 Thread GitBox


showuon commented on pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#issuecomment-961861164


   @UnityLung @chia7712  @rhauch , please help review this PR. thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor

2021-11-05 Thread GitBox


ijuma commented on pull request #11429:
URL: https://github.com/apache/kafka/pull/11429#issuecomment-961367197


   @dajac Makes sense, pushed to trunk, 3.1 and 3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] upsidedownsmile commented on pull request #11375: KAFKA-10865: Log transformed record in WorkerSinkTask

2021-11-05 Thread GitBox


upsidedownsmile commented on pull request #11375:
URL: https://github.com/apache/kafka/pull/11375#issuecomment-961878940






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on a change in pull request #11431: KAFKA-13397: Honor 'replication.policy.separator' configuration when creating MirrorMaker2 internal topics

2021-11-05 Thread GitBox


dongjinleekr commented on a change in pull request #11431:
URL: https://github.com/apache/kafka/pull/11431#discussion_r743692427



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
 
 // fill in reasonable defaults
 props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
-props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets."
-+ sourceAndTarget.source() + ".internal");
-props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"mm2-status."
-+ sourceAndTarget.source() + ".internal");
-props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
-+ sourceAndTarget.source() + ".internal");
+
+String separator = 
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, 
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+if (separator.equals("-")) {
+throw new ConfigException("You should not use a single dash as a " 
+ REPLICATION_POLICY_SEPARATOR);
+}
+
+props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets" + separator

Review comment:
   I have also thought over this issue again.
   
   The core of the problem seems to be, there is some gap in our configuration 
policy, and it causes the Kafka Connect internal topics may be mirrored into 
the other cluster. At first, I thought it could only happen in standalone mode, 
but it is not true - my apologies for the confusion.
   
   There are two cases this problem may happen:
   
   1. Using a custom separator in standalone mode after KIP-690 (i.e., AK 
3.1.0), what I and @OmniaGM discussed until now.
   2. Running MM2 in Kafka Connect with the default `[config, offset, 
status].storage.topic` configuration. (what @mimaison just pointed out)
   
   2 may be prevented by `topics` configuration override, and it seems like the 
users are already doing so. (Yes, it has been like this since day 1.) So, let's 
focus on 1 only.
   
   To prevent those topics from being mirrored with a custom separator, there 
are three approaches:
   
   a. Fix `ReplicationPolicy#isInternalTopic` only.
   b. Leave the users to override `topics` configuration like Kafka Connect 
mode.
   c. Make `mm2-[offsets, status, configs].{source}.internal` topics to use 
given separator.
   
   Approach a seems unavailable; to determine whether given `mm2-[offsets, 
status, configs].{source}.internal` topic is an internal one, 
`ReplicationPolicy#isInternalTopic` method should know the source cluster's 
alias, since those topics include `{source}` in their name. But actually, it 
takes a topic name as a parameter only.
   
   Approach b is good, but it may be not very clear for the users with little 
experience.
   
   Removing approach a and b, only c survives - and it is why I thought 
applying the separator to those topic names is the only way to make it work 
identically with the default separator case (which does not mirror those topics 
by default).
   
   How do you think?

##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
 
 // fill in reasonable defaults
 props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
-props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets."
-+ sourceAndTarget.source() + ".internal");
-props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"mm2-status."
-+ sourceAndTarget.source() + ".internal");
-props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
-+ sourceAndTarget.source() + ".internal");
+
+String separator = 
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, 
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+if (separator.equals("-")) {
+throw new ConfigException("You should not use a single dash as a " 
+ REPLICATION_POLICY_SEPARATOR);
+}
+
+props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets" + separator

Review comment:
   I have also thought over this issue again.
   
   The core of the problem seems to be, there is some gap in our configuration 
policy, and it causes the Kafka Connect internal topics may be mirrored into 
the other cluster. At first, I thought it could only happen in standalone mode, 
but it is not true - my apologies for the confusion.
   
   There are two cases this problem may happen:
   
   1. Using a custom separator in standalone mode after KIP-690 (i.e., AK 
3.1.0), what I and @OmniaGM discussed until now.
   2. Running MM2 in Kafka Connect with the default `[config, offset, 

[GitHub] [kafka] soceanainn commented on pull request #11470: MINOR: Update docs for producer callbacks to reflect current behaviour

2021-11-05 Thread GitBox


soceanainn commented on pull request #11470:
URL: https://github.com/apache/kafka/pull/11470#issuecomment-961908428






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11345: Allow empty last segment to have missing offset index during recovery

2021-11-05 Thread GitBox


ccding commented on pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#issuecomment-961961648


   @junrao I think this should solve the issue. Please take a look
   cc @kowshik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdesert commented on a change in pull request #11401: KAFKA-13255: use exclude filter for new topics

2021-11-05 Thread GitBox


bdesert commented on a change in pull request #11401:
URL: https://github.com/apache/kafka/pull/11401#discussion_r743062367



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
##
@@ -152,6 +155,48 @@ public void testConfigPropertyFiltering() {
 .anyMatch(x -> x.name().equals("min.insync.replicas")), "should 
not replicate excluded properties");
 }
 
+@Test
+public void testNewTopicConfigs() throws Exception {
+Map filterConfig = new HashMap<>();
+
filterConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, 
"follower\\.replication\\.throttled\\.replicas, "
++ "leader\\.replication\\.throttled\\.replicas, "
++ "message\\.timestamp\\.difference\\.max\\.ms, "
++ "message\\.timestamp\\.type, "
++ "unclean\\.leader\\.election\\.enable, "
++ "min\\.insync\\.replicas,"
++ "exclude_param.*");
+DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
+filter.configure(filterConfig);
+
+MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+new DefaultReplicationPolicy(), x -> true, filter));
+
+final String topic = "testtopic";
+List entries = new ArrayList<>();
+entries.add(new ConfigEntry("name-1", "value-1"));
+entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
+entries.add(new ConfigEntry("min.insync.replicas", "2"));
+Config config = new Config(entries);
+doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+doAnswer(invocation -> {
+Map newTopics = invocation.getArgument(0);
+assertNotNull(newTopics.get("source." + topic));
+Map targetConfig = newTopics.get("source." + 
topic).configs();
+
+// property 'name-1' isn't defined in the exclude filter -> should 
be replicated
+assertNotNull(targetConfig.get("name-1"), "should replicate 
properties");
+
+// this property is in default list, just double check it:
+String prop1 = "min.insync.replicas";
+assertNull(targetConfig.get(prop1), "should not replicate excluded 
properties " + prop1);
+// this property is only in exclude filter custom parameter, also 
tests regex on the way:
+String prop2 = "exclude_param.param1";
+assertNull(targetConfig.get(prop2), "should not replicate excluded 
properties " + prop2);
+return null;
+}).when(connector).createNewTopics(any());
+connector.createNewTopics(Collections.singleton(topic), 
Collections.singletonMap(topic, 1L));
+}

Review comment:
   agree. with a small correction of `createNewTopics(any(), any())`. since 
the `targetConfig()` is being called from overloaded method with two params.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r743682843



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +692,22 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
+if (future == null)
+onJoinPrepareAsyncCommitSucceeded = true;
+else {
+if (future.succeeded()) {
+onJoinPrepareAsyncCommitSucceeded = true;
+} else if (future.failed() && !future.isRetriable()) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Asynchronous auto-commit offsets failed: {}", 
future.exception().getMessage());
+onJoinPrepareAsyncCommitSucceeded = true;
+}

Review comment:
   sure

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final 
Map

[GitHub] [kafka] RivenSun2 commented on pull request #11461: KAFKA-13422: Add verification of duplicate configuration for each type of LoginModule in JaasConfigFile

2021-11-05 Thread GitBox


RivenSun2 commented on pull request #11461:
URL: https://github.com/apache/kafka/pull/11461#issuecomment-961575399


   @guozhangwang @ijuma 
   please help check this PR when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor

2021-11-05 Thread GitBox


dajac commented on pull request #11429:
URL: https://github.com/apache/kafka/pull/11429#issuecomment-961343546


   @ijuma Should we pick this one to the 3.1 branch as well? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


RivenSun2 commented on pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#issuecomment-961940047


   @showuon  Thanks again.
   please browse it when available.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13434) Add a public API for AbstractCoordinator

2021-11-05 Thread Hector Geraldino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino updated KAFKA-13434:
-
Summary: Add a public API for AbstractCoordinator  (was: Add a public API 
for AbstractCoordinatos)

> Add a public API for AbstractCoordinator
> 
>
> Key: KAFKA-13434
> URL: https://issues.apache.org/jira/browse/KAFKA-13434
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Major
>
> KIP-784: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-784%3A+Add+public+APIs+for+AbstractCoordinator]
> The AbstractCoordinator should have a companion public interface that is part 
> of Kafka's public API, so backwards compatibility can be maintained in future 
> versions of the client libraries



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr commented on pull request #11430: KAFKA-13352: Kafka Client does not support passwords starting with number in jaas config

2021-11-05 Thread GitBox


dongjinleekr commented on pull request #11430:
URL: https://github.com/apache/kafka/pull/11430#issuecomment-961934705


   @rajinisivaram Here is the fix; I allowed the asterisks following the base 
implementation and commented on how to mix numbers & the other symbols in the 
string. (oh, updating the Jira would also be needed. Isn't it?)
   
   I also tried to find a way to assert it works identically with the base 
implementation, but failed. Instead, I just added additional tests only.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-05 Thread GitBox


showuon commented on pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#issuecomment-961821207


   No, only committers can merge the PR.  @hachikuji, could you help check this 
PR? Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-05 Thread GitBox


dajac commented on pull request #11467:
URL: https://github.com/apache/kafka/pull/11467#issuecomment-961343975


   @showuon Thanks. I will take a look soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-05 Thread GitBox


RivenSun2 commented on pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#issuecomment-961805853


   @showuon , can you help to merge this PR?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r743635124



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +692,22 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
+if (future == null)
+onJoinPrepareAsyncCommitSucceeded = true;
+else {
+if (future.succeeded()) {
+onJoinPrepareAsyncCommitSucceeded = true;
+} else if (future.failed() && !future.isRetriable()) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Asynchronous auto-commit offsets failed: {}", 
future.exception().getMessage());
+onJoinPrepareAsyncCommitSucceeded = true;
+}

Review comment:
   I think we should also log the error in `failed && isRetriable()` case

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final 
Map

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r742933244



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] showuon commented on a change in pull request #11467: MINOR: fix java doc in kafkaProducer

2021-11-05 Thread GitBox


showuon commented on a change in pull request #11467:
URL: https://github.com/apache/kafka/pull/11467#discussion_r742808430



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -100,8 +100,6 @@
  * {@code
  * Properties props = new Properties();
  * props.put("bootstrap.servers", "localhost:9092");
- * props.put("acks", "all");
- * props.put("retries", 0);

Review comment:
   In V3.0, `acks` is default to `all` already. And `enable.idempotence` is 
set to true in V3.0, which means `retires` can not set to 0 now.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -117,14 +115,15 @@
  * as well as a background I/O thread that is responsible for turning these 
records into requests and transmitting them
  * to the cluster. Failure to close the producer after use will leak these 
resources.
  * 
- * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called it adds the record to a buffer of pending record sends
+ * The {@link #send(ProducerRecord) send()} method is asynchronous. When 
called, it adds the record to a buffer of pending sending record
  * and immediately returns. This allows the producer to batch together 
individual records for efficiency.
  * 
  * The acks config controls the criteria under which requests are 
considered complete. The "all" setting
  * we have specified will result in blocking on the full commit of the record, 
the slowest but most durable setting.
  * 
- * If the request fails, the producer can automatically retry, though since we 
have specified retries
- * as 0 it won't. Enabling retries also opens up the possibility of duplicates 
(see the documentation on
+ * If the request fails, the producer can automatically retry. We have 
specified retries default as Integer.MAX_VALUE.
+ * It's recommended to use delivery.timeout.ms to control retry 
behavior, instead of retries.
+ * Enabling retries also opens up the possibility of duplicates (see the 
documentation on

Review comment:
   After 
[KIP-91](https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer),
 we set `retries` as max value, and it's recommended to use 
`delivery.timeout.ms` to control retry behavior. Update it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

2021-11-05 Thread GitBox


showuon commented on pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#issuecomment-961861164


   @UnityLung @chia7712  @rhauch , please help review this PR. thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#issuecomment-961259072


   @jolshan It seems that there are a few compilation errors, at least for `JDK 
8 and Scala 2.12`. Could you check?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11429: KAFKA-13396: allow create topic without partition/replicaFactor

2021-11-05 Thread GitBox


ijuma commented on pull request #11429:
URL: https://github.com/apache/kafka/pull/11429#issuecomment-961367197


   @dajac Makes sense, pushed to trunk, 3.1 and 3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13435) Group won't consume partitions added after static member restart

2021-11-05 Thread Ryan Leslie (Jira)
Ryan Leslie created KAFKA-13435:
---

 Summary: Group won't consume partitions added after static member 
restart
 Key: KAFKA-13435
 URL: https://issues.apache.org/jira/browse/KAFKA-13435
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.7.0
Reporter: Ryan Leslie


When using consumer groups with static membership, if the consumer marked as 
leader has restarted, then metadata changes such as partition increase are not 
triggering expected rebalances.

To reproduce this issue, simply:
 # Create a static consumer subscribed to a single topic
 # Close the consumer and create a new one with the same group instance id
 # Increase partitions for the topic
 # Observe that no rebalance occurs and the new partitions are not assigned

I have only tested this in 2.7, but it may apply to newer versions as well.
h3. Analysis

In _ConsumerCoordinator_, one responsibility of the leader consumer is to track 
metadata and trigger a rebalance if there are changes such as new partitions 
added:

[https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
{code:java}
if (assignmentSnapshot != null && 
!assignmentSnapshot.matches(metadataSnapshot)) {
...
requestRejoinIfNecessary(reason);
return true;
}
{code}
Note that _assignmentSnapshot_ is currently only set if the consumer is the 
leader:

[https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
{code:java}
// Only the leader is responsible for monitoring for metadata changes (i.e. 
partition changes)
if (!isLeader)
assignmentSnapshot = null;
{code}
And _isLeader_ is only true after an assignment is performed during a rebalance:

[https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]

That is, when a consumer group forms, exactly one consumer in the group should 
have _isLeader == True_ and be responsible for triggering rebalances on 
metadata changes.

However, in the case of static membership, if the leader has been restarted and 
rejoined the group, the group essentially no longer has a current leader. Even 
though the metadata changes are fetched, no rebalance will be triggered. That 
is, _isLeader_ will be false for all members.

This issue does not resolve until after an actual group change that causes a 
proper rebalance. In order to safely make a partition increase when using 
static membership, consumers must be stopped and have timed out, or forcibly 
removed with _AdminClient.removeMembersFromConsumerGroup()_.

Correcting this in the client probably also requires help from the broker. 
Currently, when a static consumer that is leader is restarted, the coordinator 
does recognize the change:

e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
{noformat}
[2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member 
Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test with 
unknown member id rejoins, assigning new member id 
353WV-1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
6ebf-47da-95ef-c54fef17ab74, while old member id 
1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff
 will be removed. (
kafka.coordinator.group.GroupCoordinator){noformat}
However, it does not attempt to update the leader id since this isn't a new 
rebalance, and JOIN_GROUP will continue returning the now stale member id as 
leader:
{noformat}
2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer 
instanceId=353WV-1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, 
clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, groupId=ryan_test] 
Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, 
errorCode=0, generationId=40, protocolType='consumer', protocolName='range', 
leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff',
 
memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74',
 members=[]){noformat}
This means that it's not easy for any particular restarted member to identify 
that it should consider itself leader and handle metadata changes.

There is reference to the difficulty of leader restarts in KAFKA-7728 but the 
focus seemed mainly on avoiding needless rebalances for static members. That 
goal was accomplished, but this issue seems to be a side effect of both not 
rebalancing AND not having the rejoined member re-claim its leadership status.

Also, I have not verified if it's strictly related or valid, but noticed this 
ticket has been opened too: 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743909554



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   Yeah, I agree with you. Perhaps, we could just remove the 
maybeSetTopicName and move its logic into the update request params method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743908454



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   Right. You might have to assert on the request in the fetcher as well. 
As you said, we can't really get the data out from the builder otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13434) Add a public API for AbstractCoordinatos

2021-11-05 Thread Hector G (Jira)
Hector G created KAFKA-13434:


 Summary: Add a public API for AbstractCoordinatos
 Key: KAFKA-13434
 URL: https://issues.apache.org/jira/browse/KAFKA-13434
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Hector G
Assignee: Hector G


KIP-784: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-784%3A+Add+public+APIs+for+AbstractCoordinator]

The AbstractCoordinator should have a companion public interface that is part 
of Kafka's public API, so backwards compatibility can be maintained in future 
versions of the client libraries



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743841128



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   I thought about the same name, but I thought it was a slightly different 
approach --> looking up in the map where it is maybe there vs. supplying the 
name. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743839587



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   The part I don't understand is that this building is in a method that 
sends the requests. I'm not sure how to pull that out and test specifically 
that the fetcher is getting the correct info. The fetcher is simply pulling 
from the FetchSessionHandler's build FetchRequestData, so I feel like that is 
sufficient unless I'm missing something.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743839587



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   The part I don't understand is that this building is in a method that 
sends the requests. I'm not sure how to pull that out and test specifically 
that the fetcher is getting the correct info. The fetcher is simply pulling 
from the FetchSessionHandler's build FetchRequestData.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743836225



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   We should have a test in the Fetcher which ensure that the builder 
received the correct information. Then we could have one for the request which 
ensure that the builder does its job correctly as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743830862



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -66,16 +69,28 @@ public PartitionData(
 int maxBytes,
 Optional currentLeaderEpoch
 ) {
-this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, 
Optional.empty());
+this(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, 
currentLeaderEpoch, Optional.empty());

Review comment:
   I can do that but it will take some time. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743830587



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   So you are asking for a test that is checking the fetcher builds the 
request correctly? Is this a test for the fetcher or the builder? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12886) Enable request forwarding by default in 3.1

2021-11-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-12886:

Fix Version/s: (was: 3.1.0)
   3.2.0

> Enable request forwarding by default in 3.1
> ---
>
> Key: KAFKA-12886
> URL: https://issues.apache.org/jira/browse/KAFKA-12886
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Ryan Dielhenn
>Priority: Major
> Fix For: 3.2.0
>
>
> KIP-590 documents that request forwarding will be enabled in 3.0 by default: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller.
>  This makes it a requirement for users with custom principal implementations 
> to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 
> because we saw this as a compatibility break. 
> The KIP documents that use of forwarding will be controlled by the IBP. So 
> once the IBP has been configured to 3.0 or above, then the brokers will begin 
> forwarding.
> (Note that forwarding has always been a requirement for kraft.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12886) Enable request forwarding by default in 3.1

2021-11-05 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439340#comment-17439340
 ] 

David Jacot commented on KAFKA-12886:
-

Moved it to 3.2.0.

> Enable request forwarding by default in 3.1
> ---
>
> Key: KAFKA-12886
> URL: https://issues.apache.org/jira/browse/KAFKA-12886
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Ryan Dielhenn
>Priority: Major
> Fix For: 3.2.0
>
>
> KIP-590 documents that request forwarding will be enabled in 3.0 by default: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller.
>  This makes it a requirement for users with custom principal implementations 
> to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 
> because we saw this as a compatibility break. 
> The KIP documents that use of forwarding will be controlled by the IBP. So 
> once the IBP has been configured to 3.0 or above, then the brokers will begin 
> forwarding.
> (Note that forwarding has always been a requirement for kraft.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743764793



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -285,52 +268,57 @@ public FetchRequestData build() {
 if (nextMetadata.isFull()) {
 if (log.isDebugEnabled()) {
 log.debug("Built full fetch {} for node {} with {}.",
-  nextMetadata, node, 
partitionsToLogString(next.keySet()));
+nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
 }
 sessionPartitions = next;
 next = null;
 // Only add topic IDs to the session if we are using topic IDs.
 if (canUseTopicIds) {
-sessionTopicIds = topicIds;
-sessionTopicNames = new HashMap<>(topicIds.size());
-topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+Map> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet(;
+
+sessionTopicNames = new HashMap<>(newTopicNames.size());
+// There should only be one topic name per topic ID.
+newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
 } else {
-sessionTopicIds = new HashMap<>();
-sessionTopicNames = new HashMap<>();
+sessionTopicNames = Collections.emptyMap();
 }
-topicIds = null;
 Map toSend =
-Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
-Map toSendTopicIds =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicIds));
-Map toSendTopicNames =
-Collections.unmodifiableMap(new 
HashMap<>(sessionTopicNames));
-return new FetchRequestData(toSend, Collections.emptyList(), 
toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
+return new FetchRequestData(toSend, Collections.emptyList(), 
Collections.emptyList(), toSend, nextMetadata, canUseTopicIds);
 }
 
-List added = new ArrayList<>();
-List removed = new ArrayList<>();
-List altered = new ArrayList<>();
+List added = new ArrayList<>();
+List removed = new ArrayList<>();
+List altered = new ArrayList<>();
+List replaced = new ArrayList<>();
 for (Iterator> iter =
- sessionPartitions.entrySet().iterator(); iter.hasNext(); 
) {
+ sessionPartitions.entrySet().iterator(); iter.hasNext(); ) {
 Entry entry = iter.next();
 TopicPartition topicPartition = entry.getKey();
 PartitionData prevData = entry.getValue();
 PartitionData nextData = next.remove(topicPartition);
 if (nextData != null) {
-if (!prevData.equals(nextData)) {
+// We basically check if the new partition had the same 
topic ID. If not,
+// we add it to the "replaced" set.
+if (!prevData.topicId.equals(nextData.topicId) && 
!prevData.topicId.equals(Uuid.ZERO_UUID)) {
+// Re-add the replaced partition to the end of 'next'
+next.put(topicPartition, nextData);
+entry.setValue(nextData);
+replaced.add(new TopicIdPartition(prevData.topicId, 
topicPartition));
+} else if (!prevData.equals(nextData)) {
 // Re-add the altered partition to the end of 'next'
 next.put(topicPartition, nextData);
 entry.setValue(nextData);
-altered.add(topicPartition);
+altered.add(new TopicIdPartition(nextData.topicId, 
topicPartition));
 }
 } else {
 // Remove this partition from the session.
 iter.remove();
 // Indicate that we no longer want to listen to this 
partition.
-removed.add(topicPartition);
+removed.add(new TopicIdPartition(prevData.topicId, 
topicPartition));
 // If we do not have this topic ID in the builder or the 
session, we can not use topic IDs.
-if 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743763229



##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -305,9 +304,10 @@ class ReplicaFetcherThread(name: String,
 } else {
   val version: Short = if (fetchRequestVersion >= 13 && 
!fetchData.canUseTopicIds) 12 else fetchRequestVersion
   val requestBuilder = FetchRequest.Builder
-.forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend, 
fetchData.topicIds)
+.forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend)
 .setMaxBytes(maxBytes)
-.toForget(fetchData.toForget)
+.removed(fetchData.toForget)
+.replaced(fetchData.toReplace)

Review comment:
   Do we have tests verifying this change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743759128



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   Should we use the same name for both `maybeSetUnknownName` and 
`maybeResolveUnknownName`? I guess that you could differ by their argument.
   
   If we add unit tests for other methods of this class, should we cover all 
the methods that we have changed or added as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743759128



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   Should we use the same name for both `maybeSetUnknownName` and 
`maybeResolveUnknownName`? I guess that you could differ by their argument.
   
   If we add unit tests for other methods of this class, should we cover them 
as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743759128



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -93,27 +93,42 @@ class CachedPartition(val topic: String,
   def this(topic: String, partition: Int, topicId: Uuid) =
 this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, 
Optional.empty[Integer])
 
-  def this(part: TopicPartition, topicId: Uuid) =
-this(part.topic, part.partition, topicId)
+  def this(part: TopicIdPartition) = {
+this(part.topic, part.partition, part.topicId)
+  }
 
-  def this(part: TopicPartition, id: Uuid, reqData: 
FetchRequest.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) =
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, -1,
   reqData.currentLeaderEpoch, reqData.logStartOffset, -1, 
reqData.lastFetchedEpoch)
 
-  def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData,
+  def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData,
respData: FetchResponseData.PartitionData) =
-this(part.topic, id, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
+this(part.topic, part.topicId, part.partition, reqData.maxBytes, 
reqData.fetchOffset, respData.highWatermark,
   reqData.currentLeaderEpoch, reqData.logStartOffset, 
respData.logStartOffset, reqData.lastFetchedEpoch)
 
-  def reqData = new FetchRequest.PartitionData(fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+  def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, 
fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
 
-  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+  def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, 
name: String): Unit = {
 // Update our cached request parameters.
 maxBytes = reqData.maxBytes
 fetchOffset = reqData.fetchOffset
 fetcherLogStartOffset = reqData.logStartOffset
 leaderEpoch = reqData.currentLeaderEpoch
 lastFetchedEpoch = reqData.lastFetchedEpoch
+// Update name if needed
+maybeSetUnknownName(name)
+  }
+
+  def maybeSetUnknownName(name: String): Unit = {
+if (this.topic == null) {
+  this.topic = name
+}
+  }
+
+  def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = 
{

Review comment:
   nit: Should we use the same name for both `maybeSetUnknownName` and 
`maybeResolveUnknownName`? I guess that you could differ by their argument.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743756690



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -163,18 +178,37 @@ class CachedPartition(val topic: String,
 mustRespond
   }
 
-  override def hashCode: Int = Objects.hash(new TopicPartition(topic, 
partition), topicId)
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   * This means we need a different hash function as well. We use name to 
calculate the hash if the ID is zero and unused.
+   * Otherwise, we use the topic ID in the hash calculation.
+   *
+   * @return the hash code for the CachedPartition depending on what request 
version we are using.
+   */
+  override def hashCode: Int =
+if (topicId != Uuid.ZERO_UUID)
+  (31 * partition) + topicId.hashCode
+else
+  (31 * partition) + topic.hashCode
 
   def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition]

Review comment:
   I guess that we could remove it now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743755936



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -413,8 +413,20 @@ abstract class AbstractFetcherThread(name: String,
 
 case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
   warn(s"Received ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from 
the leader for partition $topicPartition. " +
-   "This error may be returned transiently when the 
partition is being created or deleted, but it is not " +
-   "expected to persist.")
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")
+  partitionsWithError += topicPartition
+
+case Errors.UNKNOWN_TOPIC_ID =>
+  warn(s"Received ${Errors.UNKNOWN_TOPIC_ID} from the leader 
for partition $topicPartition. " +
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")
+  partitionsWithError += topicPartition
+
+case Errors.INCONSISTENT_TOPIC_ID =>
+  warn(s"Received ${Errors.INCONSISTENT_TOPIC_ID} from the 
leader for partition $topicPartition. " +
+"This error may be returned transiently when the partition 
is being created or deleted, but it is not " +
+"expected to persist.")

Review comment:
   Do we have unit tests covering those cases? There are almost no changes 
in `AbstractFetcherThreadTest` so it seems that we don't. Are they somewhere 
else perhaps?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743753874



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 }
   } catch {
-case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
   Anyway, we don't need to address this in this PR. I just wanted to point 
out that there is an opportunity for an improvement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743753319



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 }
   } catch {
-case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
   Sorry, I wanted to say happen.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743044369



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
   }
 }
   } catch {
-case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
   I think that would for instance happen when the controller fails over to 
an older IBP during an upgrade. This should remove the topic ids which means 
that v12 will be used for the next fetch request and trigger a 
FETCH_SESSION_TOPIC_ID_ERROR. In this particular case, re-trying directly would 
be the optimal way to proceed for a follower. I wonder if they are other cases 
to consider here.
   
   For the consumer, it is definitely different.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743751246



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -242,65 +244,68 @@ public void testIncrementals() {
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
 addTopicId(topicIds, topicNames, "foo", version);
-builder.add(new TopicPartition("foo", 0), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
-builder.add(new TopicPartition("foo", 1), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID);
+TopicPartition foo0 = new TopicPartition("foo", 0);
+TopicPartition foo1 = new TopicPartition("foo", 1);
+builder.add(foo0, new FetchRequest.PartitionData(fooId, 0, 100, 
200, Optional.empty()));
+builder.add(foo1, new FetchRequest.PartitionData(fooId, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200),
-new ReqEntry("foo", 1, 10, 110, 210)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200),
+new ReqEntry("foo", fooId, 1, 10, 110, 210)),
 data.toSend(), data.sessionPartitions());
 assertEquals(INVALID_SESSION_ID, data.metadata().sessionId());
 assertEquals(INITIAL_EPOCH, data.metadata().epoch());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20),
-new RespEntry("foo", 1, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20),
+new RespEntry("foo", 1, fooId, 10, 20)));
 handler.handleResponse(resp, version);
 
 // Test an incremental fetch request which adds one partition and 
modifies another.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
 addTopicId(topicIds, topicNames, "bar", version);
-builder2.add(new TopicPartition("foo", 0), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
-builder2.add(new TopicPartition("foo", 1), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(10, 120, 210, 
Optional.empty()));
-builder2.add(new TopicPartition("bar", 0), 
topicIds.getOrDefault("bar", Uuid.ZERO_UUID),
-new FetchRequest.PartitionData(20, 200, 200, 
Optional.empty()));
+Uuid barId = topicIds.getOrDefault("bar", Uuid.ZERO_UUID);
+TopicPartition bar0 = new TopicPartition("bar", 0);
+builder2.add(foo0,
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));

Review comment:
   There are a few more cases where we could put the partition data back on 
the previous line in this file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743750508



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -314,8 +356,7 @@ public int maxBytes() {
 
 // For versions < 13, builds the partitionData map using only the 
FetchRequestData.
 // For versions 13+, builds the partitionData map using both the 
FetchRequestData and a mapping of topic IDs to names.
-// Throws UnknownTopicIdException for versions 13+ if the topic ID was 
unknown to the server.
-public Map fetchData(Map 
topicNames) throws UnknownTopicIdException {
+public Map fetchData(Map 
topicNames) throws UnknownTopicIdException {

Review comment:
   Do we have a unit test for this one and for `forgottenTopics`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743749915



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -199,26 +235,31 @@ public FetchRequest build(short version) {
 fetchRequestData.setMaxBytes(maxBytes);
 fetchRequestData.setIsolationLevel(isolationLevel.id());
 fetchRequestData.setForgottenTopicsData(new ArrayList<>());
-toForget.stream()
-.collect(Collectors.groupingBy(TopicPartition::topic, 
LinkedHashMap::new, Collectors.toList()))
-.forEach((topic, partitions) ->
-fetchRequestData.forgottenTopicsData().add(new 
FetchRequestData.ForgottenTopic()
-.setTopic(topic)
-.setTopicId(topicIds.getOrDefault(topic, 
Uuid.ZERO_UUID))
-
.setPartitions(partitions.stream().map(TopicPartition::partition).collect(Collectors.toList(
-);
-fetchRequestData.setTopics(new ArrayList<>());
+
+Map forgottenTopicMap = 
new LinkedHashMap<>();
+addToForgottenTopicMap(removed, forgottenTopicMap);
+
+// If a version older than v13 is used, topic-partition which were 
replaced
+// by a topic-partition with the same name but a different topic 
ID are not
+// sent out in the "forget" set in order to not remove the newly 
added
+// partition in the "fetch" set.
+if (version >= 13) {
+addToForgottenTopicMap(replaced, forgottenTopicMap);
+}

Review comment:
   Should we add a few unit tests to validate the changes that we have done 
in this class? We could add a few to FetchRequestTest (not use if it already 
exists though).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743745314



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -262,11 +262,12 @@ public synchronized int sendFetches() {
 maxVersion = ApiKeys.FETCH.latestVersion();
 }
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), data.topicIds())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend())
 .isolationLevel(isolationLevel)
 .setMaxBytes(this.maxBytes)
 .metadata(data.metadata())
-.toForget(data.toForget())
+.removed(data.toForget())
+.replaced(data.toReplace())

Review comment:
   Should we add or extend a test in `FetcherTest` to cover this change? I 
would like to have one which ensure that the request sent is populated 
correctly (especially the replaced part) by the fetcher based on the session 
handler. It seems that we don't have such test in the suite at the moment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11345: Allow empty last segment to have missing offset index during recovery

2021-11-05 Thread GitBox


ccding commented on pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#issuecomment-961961648


   @junrao I think this should solve the issue. Please take a look
   cc @kowshik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


RivenSun2 commented on pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#issuecomment-961940047


   @showuon  Thanks again.
   please browse it when available.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #11430: KAFKA-13352: Kafka Client does not support passwords starting with number in jaas config

2021-11-05 Thread GitBox


dongjinleekr commented on pull request #11430:
URL: https://github.com/apache/kafka/pull/11430#issuecomment-961934705


   @rajinisivaram Here is the fix; I allowed the asterisks following the base 
implementation and commented on how to mix numbers & the other symbols in the 
string. (oh, updating the Jira would also be needed. Isn't it?)
   
   I also tried to find a way to assert it works identically with the base 
implementation, but failed. Instead, I just added additional tests only.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file

2021-11-05 Thread GitBox


dengziming opened a new pull request #11471:
URL: https://github.com/apache/kafka/pull/11471


   *More detailed description of your change*
   Replace EasyMock with Mockito in connect:file
   
   *Summary of testing strategy (including rationale)*
   QA
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on a change in pull request #11431: KAFKA-13397: Honor 'replication.policy.separator' configuration when creating MirrorMaker2 internal topics

2021-11-05 Thread GitBox


dongjinleekr commented on a change in pull request #11431:
URL: https://github.com/apache/kafka/pull/11431#discussion_r743692427



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
 
 // fill in reasonable defaults
 props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
-props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets."
-+ sourceAndTarget.source() + ".internal");
-props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"mm2-status."
-+ sourceAndTarget.source() + ".internal");
-props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
-+ sourceAndTarget.source() + ".internal");
+
+String separator = 
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, 
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+if (separator.equals("-")) {
+throw new ConfigException("You should not use a single dash as a " 
+ REPLICATION_POLICY_SEPARATOR);
+}
+
+props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets" + separator

Review comment:
   I have also thought over this issue again.
   
   The core of the problem seems to be, there is some gap in our configuration 
policy, and it causes the Kafka Connect internal topics may be mirrored into 
the other cluster. At first, I thought it could only happen in standalone mode, 
but it is not true - my apologies for the confusion.
   
   There are two cases this problem may happen:
   
   1. Using a custom separator in standalone mode after KIP-690 (i.e., AK 
3.1.0), what I and @OmniaGM discussed until now.
   2. Running MM2 in Kafka Connect with the default `[config, offset, 
status].storage.topic` configuration. (what @mimaison just pointed out)
   
   2 may be prevented by `topics` configuration override, and it seems like the 
users are already doing so. (Yes, it has been like this since day 1.) So, let's 
focus on 1 only.
   
   To prevent those topics from being mirrored with a custom separator, there 
are three approaches:
   
   a. Fix `ReplicationPolicy#isInternalTopic` only.
   b. Leave the users to override `topics` configuration like Kafka Connect 
mode.
   c. Make `mm2-[offsets, status, configs].{source}.internal` topics to use 
given separator.
   
   Approach a seems unavailable; to determine whether given `mm2-[offsets, 
status, configs].{source}.internal` topic is an internal one, 
`ReplicationPolicy#isInternalTopic` method should know the source cluster's 
alias, since those topics include `{source}` in their name. But actually, it 
takes a topic name as a parameter only.
   
   Approach b is good, but it may be not very clear for the users with little 
experience.
   
   Removing approach a and b, only c survives - and it is why I thought 
applying the separator to those topic names is the only way to make it work 
identically with the default separator case (which does not mirror those topics 
by default).
   
   How do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r743683585



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final 
Map

[GitHub] [kafka] RivenSun2 commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


RivenSun2 commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r743682843



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +692,22 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
+if (future == null)
+onJoinPrepareAsyncCommitSucceeded = true;
+else {
+if (future.succeeded()) {
+onJoinPrepareAsyncCommitSucceeded = true;
+} else if (future.failed() && !future.isRetriable()) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Asynchronous auto-commit offsets failed: {}", 
future.exception().getMessage());
+onJoinPrepareAsyncCommitSucceeded = true;
+}

Review comment:
   sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soceanainn commented on pull request #11470: MINOR: Update docs for producer callbacks to reflect current behaviour

2021-11-05 Thread GitBox


soceanainn commented on pull request #11470:
URL: https://github.com/apache/kafka/pull/11470#issuecomment-961908428


   cc @junrao as you were involved in earlier PRs / threads on this topic


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soceanainn opened a new pull request #11470: MINOR: Update docs for producer callbacks to reflect current behaviour

2021-11-05 Thread GitBox


soceanainn opened a new pull request #11470:
URL: https://github.com/apache/kafka/pull/11470


   Originally, Callback would return a null metadata value when an error 
occurred.
   
   This was partially changed by 
[KAFKA-3303](https://issues.apache.org/jira/browse/KAFKA-3303), where in some 
cases Callback would return an 'empty' metadata. In this empty metadata 
TopicPartition is set correctly but all other fields are set as `-1`.
   
   The docs were later updated by 
[KAFKA-7412](https://issues.apache.org/jira/browse/KAFKA-7412), but it 
incorrectly states that Callback will always return this 'empty' metadata when 
an error occurs. However in the case of any exceptions that are a subclass of 
ApiException, Callback will still return a null value (see 
[here](https://github.com/apache/kafka/blob/3.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1002)).
   
   This change aims to clarify the docs to accurately reflect the behaviour of 
producer callbacks.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13093) KIP-724: Log compaction should write new segments with record version v2

2021-11-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13093:

Fix Version/s: (was: 3.1.0)
   3.2.0

> KIP-724: Log compaction should write new segments with record version v2
> 
>
> Key: KAFKA-13093
> URL: https://issues.apache.org/jira/browse/KAFKA-13093
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.2.0
>
>
> If IBP is 3.0 or higher. Currently, log compaction retains the record format 
> of the record batch that was retained.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] upsidedownsmile commented on pull request #11375: KAFKA-10865: Log transformed record in WorkerSinkTask

2021-11-05 Thread GitBox


upsidedownsmile commented on pull request #11375:
URL: https://github.com/apache/kafka/pull/11375#issuecomment-961878940


   @cadonna  could you review this please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r743634380



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final 
Map

[GitHub] [kafka] showuon commented on a change in pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2021-11-05 Thread GitBox


showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r743635124



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +692,22 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
+if (future == null)
+onJoinPrepareAsyncCommitSucceeded = true;
+else {
+if (future.succeeded()) {
+onJoinPrepareAsyncCommitSucceeded = true;
+} else if (future.failed() && !future.isRetriable()) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Asynchronous auto-commit offsets failed: {}", 
future.exception().getMessage());
+onJoinPrepareAsyncCommitSucceeded = true;
+}

Review comment:
   I think we should also log the error in `failed && isRetriable()` case

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -692,10 +693,18 @@ private void validateCooperativeAssignment(final 
Map

[GitHub] [kafka] showuon commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

2021-11-05 Thread GitBox


showuon commented on pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#issuecomment-961861164


   @UnityLung @chia7712  @rhauch , please help review this PR. thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #11469: MINOR: disable zookeeper.sasl.client to avoid false error

2021-11-05 Thread GitBox


chia7712 opened a new pull request #11469:
URL: https://github.com/apache/kafka/pull/11469


   Zookeeper connection always does SASL checks currently. That behavior 
produces false warnings to kafka log. For example:
   
   ### using PLAINTEXT
   ```
   [2021-11-05 11:39:33,738] INFO Opening socket connection to server 
caijiapngdeiMac/192.168.50.178:19516. Will not attempt to authenticate using 
SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
   ```
   
   ### using SASL_PLAINTEXT with `java.security.auth.login.config`
   ```
   [2021-11-05 11:52:37,130] ERROR [ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient)
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-05 Thread GitBox


showuon commented on pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#issuecomment-961821207


   No, only committers can merge the PR.  @hachikuji, could you help check this 
PR? Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743578530



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -3530,37 +3534,37 @@ class KafkaApisTest {
   def testSizeOfThrottledPartitions(): Unit = {
 val topicNames = new util.HashMap[Uuid, String]
 val topicIds = new util.HashMap[String, Uuid]()
-def fetchResponse(data: Map[TopicPartition, String]): FetchResponse = {
-  val responseData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData](
+def fetchResponse(data: Map[TopicIdPartition, String]): FetchResponse = {
+  val responseData = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData](
 data.map { case (tp, raw) =>
   tp -> new FetchResponseData.PartitionData()
-.setPartitionIndex(tp.partition)
+.setPartitionIndex(tp.topicPartition.partition)
 .setHighWatermark(105)
 .setLastStableOffset(105)
 .setLogStartOffset(0)
 .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8
   }.toMap.asJava)
 
   data.foreach{case (tp, _) =>
-val id = Uuid.randomUuid()
-topicIds.put(tp.topic(), id)
-topicNames.put(id, tp.topic())
+topicIds.put(tp.topicPartition.topic, tp.topicId)
+topicNames.put(tp.topicId, tp.topicPartition.topic)
   }
-  FetchResponse.of(Errors.NONE, 100, 100, responseData, topicIds)
+  FetchResponse.of(Errors.NONE, 100, 100, responseData)
 }
 
-val throttledPartition = new TopicPartition("throttledData", 0)
+val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("throttledData", 0))
 val throttledData = Map(throttledPartition -> "throttledData")
 val expectedSize = 
FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
-  fetchResponse(throttledData).responseData(topicNames, 
FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.iterator, topicIds)
+  fetchResponse(throttledData).responseData(topicNames, 
FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.asScala.map( entry =>
+  (new TopicIdPartition(Uuid.ZERO_UUID, entry.getKey), 
entry.getValue)).toMap.asJava.entrySet.iterator)
 
-val response = fetchResponse(throttledData ++ Map(new 
TopicPartition("nonThrottledData", 0) -> "nonThrottledData"))
+val response = fetchResponse(throttledData ++ Map(new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("nonThrottledData", 0)) 
-> "nonThrottledData"))
 
 val quota = Mockito.mock(classOf[ReplicationQuotaManager])
 
Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition])))
-  .thenAnswer(invocation => throttledPartition == 
invocation.getArgument(0).asInstanceOf[TopicPartition])
+  .thenAnswer(invocation => throttledPartition.topicPartition == 
invocation.getArgument(0).asInstanceOf[TopicPartition])
 
-assertEquals(expectedSize, 
KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
 response, quota, topicIds))
+assertEquals(expectedSize, 
KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
 response, quota))
   }
 
   @Test

Review comment:
   That is right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743578382



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val topicIds = new util.HashMap[String, Uuid]()
-val topicNames = new util.HashMap[Uuid, String]()
+val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava

Review comment:
   You could use `assertPartitionsOrder` helper here as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743578252



##
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
 val time = new MockTime()
 val cache = new FetchSessionCache(10, 1000)
 val fetchManager = new FetchManager(time, cache)
-val topicIds = new util.HashMap[String, Uuid]()
-val topicNames = new util.HashMap[Uuid, String]()
+val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava
+val topicIds = topicNames.asScala.map(_.swap).asJava
+val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 0))
+val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 1))
+val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 0))
+val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 1))
 
-// Create a new fetch session with foo-0
+// Create a new fetch session with foo-0 and foo-1
 val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-reqData1.put(new TopicPartition("foo", 0), new 
FetchRequest.PartitionData(0, 0, 100,
+reqData1.put(foo0.topicPartition, new 
FetchRequest.PartitionData(foo0.topicId, 0, 0, 100,
   Optional.empty()))
-val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, 
reqData1, topicIds, EMPTY_PART_LIST, false)
-// Start a fetch session using a request version that does not use topic 
IDs.
+reqData1.put(foo1.topicPartition, new 
FetchRequest.PartitionData(foo1.topicId,10, 0, 100,
+  Optional.empty()))
+val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+// Simulate unknown topic ID for foo.
+val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), 
"bar")
+// We should not throw error since we have an older request version.
 val context1 = fetchManager.newContext(
   request1.version,
   request1.metadata,
   request1.isFromFollower,
-  request1.fetchData(topicNames),
-  request1.forgottenTopics(topicNames),
-  topicIds
+  request1.fetchData(topicNamesOnlyBar),
+  request1.forgottenTopics(topicNamesOnlyBar),
+  topicNamesOnlyBar
 )
 assertEquals(classOf[FullFetchContext], context1.getClass)
-val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-respData1.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]

Review comment:
   Yeah, I meant exactly that. How about using `assertPartitionsOrder` 
helper? The assertion would be more complete.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743577292



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##
@@ -4814,6 +4842,7 @@ private void buildDependencies(MetricConfig metricConfig,
 metadata = new ConsumerMetadata(0, metadataExpireMs, false, false,
 subscriptions, logContext, new ClusterResourceListeners());
 client = new MockClient(time, metadata);
+client.setNodeApiVersions(NodeApiVersions.create());
 metrics = new Metrics(metricConfig, time);
 consumerClient = new ConsumerNetworkClient(logContext, client, 
metadata, time,
 100, 1000, Integer.MAX_VALUE);

Review comment:
   Yes, I was referring to those. Ack, I missed them during my first read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743576928



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743575635



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());

Review comment:
   Correct. I was referring to the upgrade case. We might need to handle 
the downgrade case for 

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

2021-11-05 Thread GitBox


dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743574922



##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
 List partitions = Arrays.asList(0, 1);
 partitions.forEach(partition -> {
 String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-Map topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+Uuid fooId = Uuid.randomUuid();
 FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
 FetchSessionHandler.Builder builder = handler.newBuilder();
-builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+builder.add(new TopicPartition("foo", 0),
+new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data = builder.build();
-assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
 data.toSend(), data.sessionPartitions());
 assertTrue(data.metadata().isFull());
 assertTrue(data.canUseTopicIds());
 
 FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+respMap(new RespEntry("foo", 0, fooId, 10, 20)));
 handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+builder2.add(new TopicPartition("foo", partition),
+new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
-// Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-// The receiving broker will close the session if we were 
previously using topic IDs.
+// Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+// The receiving broker will handle closing the session.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
 assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+TopicPartition tp = new TopicPartition("foo", 0);
+FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+FetchSessionHandler.Builder builder = handler.newBuilder();
+Uuid topicId1 = Uuid.randomUuid();
+builder.add(tp,
+new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+FetchSessionHandler.FetchRequestData data = builder.build();
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+data.toSend(), data.sessionPartitions());
+assertTrue(data.metadata().isFull());
+assertTrue(data.canUseTopicIds());
+
+FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+handler.handleResponse(resp, (short) 12);
+
+// Try to add a new topic ID.
+FetchSessionHandler.Builder builder2 = handler.newBuilder();
+Uuid topicId2 = Uuid.randomUuid();
+// Use the same data besides the topic ID.
+FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+builder2.add(tp, partitionData);
+FetchSessionHandler.FetchRequestData data2 = builder2.build();
+// The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+data2.toSend(), data2.sessionPartitions());
+assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+// Should have the same session ID, and next epoch and can use topic 
IDs.
+assertEquals(123, data2.metadata().sessionId(), "Did not use same 

[GitHub] [kafka] RivenSun2 commented on pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2021-11-05 Thread GitBox


RivenSun2 commented on pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#issuecomment-961805853


   @showuon , can you help to merge this PR?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12291) Fix Ignored Upgrade Tests in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2021-11-05 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439095#comment-17439095
 ] 

Bruno Cadonna commented on KAFKA-12291:
---

[~dajac] I downgraded it to Critical. Sorry for the inconvenience!

> Fix Ignored Upgrade Tests in streams_upgrade_test.py: 
> test_upgrade_downgrade_brokers
> 
>
> Key: KAFKA-12291
> URL: https://issues.apache.org/jira/browse/KAFKA-12291
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: Bruno Cadonna
>Priority: Critical
> Fix For: 3.1.0
>
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12291) Fix Ignored Upgrade Tests in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2021-11-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-12291:
--
Priority: Critical  (was: Blocker)

> Fix Ignored Upgrade Tests in streams_upgrade_test.py: 
> test_upgrade_downgrade_brokers
> 
>
> Key: KAFKA-12291
> URL: https://issues.apache.org/jira/browse/KAFKA-12291
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: Bruno Cadonna
>Priority: Critical
> Fix For: 3.1.0
>
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >