[GitHub] [kafka] jlprat commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2023-04-18 Thread via GitHub


jlprat commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r1170881445


##
docs/upgrade.html:
##
@@ -26,6 +26,8 @@ Notable changes in 3
 trying to create an already existing metric. (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-843%3A+Adding+addMetricIfAbsent+method+to+Metrics";>KIP-843
 for more details).
 
+Apache Kafka now supports having both an IPv4 and an IPv6 listener 
on the same port. This change only applies to
+non advertised listeners (advertised listeners already have this 
feature)

Review Comment:
   Can you move this section to a new `3.6` area?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2023-04-18 Thread via GitHub


jlprat commented on PR #11478:
URL: https://github.com/apache/kafka/pull/11478#issuecomment-1514217231

   Hi @mimaison, the current unit tests present in the PR seem that they will 
cover the case of a potential involuntary regression for this feature. So I'm 
fine adding the change as it currently stands.
   
   @mdedetrich Could you update the documentation bit? Currently, it is under 
`3.3`, but it should be moved under a new 3.6 section.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

2023-04-18 Thread via GitHub


showuon commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1170874370


##
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
 }
 return Double.longBitsToDouble(value);
 }
+
+@Override
+public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+if (data == null) {
+return null;
+}
+
+if (data.remaining() != 8) {
+throw new SerializationException("Size of data received by 
DoubleDeserializer is not 8");
+}
+
+final ByteOrder srcOrder = data.order();
+data.order(BIG_ENDIAN);
+
+final double value = data.getDouble(data.position());

Review Comment:
   Also, the `data.position()` could be removed because by default, it'll read 
from current position of the byte buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

2023-04-18 Thread via GitHub


showuon commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1170872007


##
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
 }
 return Double.longBitsToDouble(value);
 }
+
+@Override
+public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+if (data == null) {
+return null;
+}
+
+if (data.remaining() != 8) {
+throw new SerializationException("Size of data received by 
DoubleDeserializer is not 8");
+}
+
+final ByteOrder srcOrder = data.order();
+data.order(BIG_ENDIAN);
+
+final double value = data.getDouble(data.position());

Review Comment:
   @LinShunKang , why do we need to set the order to `BIG_ENDIAN` before 
reading it? From the 
[javadoc](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#getDouble--):
   
   > Reads the next eight bytes at this buffer's current position, composing 
them into a double value according to the current byte order, and then 
increments the position by eight. 
   
   It looks like the byte order is already considered while read from the byte 
buffer. Had a quick check the jdk source code, and it did check (and convert) 
the byte order if needed. Did I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

2023-04-18 Thread via GitHub


showuon commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1170872007


##
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
 }
 return Double.longBitsToDouble(value);
 }
+
+@Override
+public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+if (data == null) {
+return null;
+}
+
+if (data.remaining() != 8) {
+throw new SerializationException("Size of data received by 
DoubleDeserializer is not 8");
+}
+
+final ByteOrder srcOrder = data.order();
+data.order(BIG_ENDIAN);
+
+final double value = data.getDouble(data.position());

Review Comment:
   @LinShunKang , why do we need to set the order to `BIG_ENDIAN` before 
reading it? From the 
[javadoc](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#getDouble--):
   
   > Reads the next eight bytes at this buffer's current position, composing 
them into a double value according to the current byte order, and then 
increments the position by eight. 
   
   It looks like the byte order is already considered while read from the byte 
buffer. Had a quick check the jdk source code, and it did check (and convert) 
the byte order if needed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-18 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170869917


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java:
##
@@ -97,6 +95,27 @@ public KTableKTableJoinMerger joinMerger() {
 return (KTableKTableJoinMerger) kChangeProcessorSupplier;
 }
 
+@Override
+public void enableVersionedSemantics(final boolean useVersionedSemantics, 
final String parentNodeName) {
+enableVersionedSemantics(thisProcessorParameters(), 
useVersionedSemantics, parentNodeName);
+enableVersionedSemantics(otherProcessorParameters(), 
useVersionedSemantics, parentNodeName);
+}
+
+@SuppressWarnings("unchecked")
+private void enableVersionedSemantics(final ProcessorParameters processorParameters,
+  final boolean useVersionedSemantics,
+  final String parentNodeName) {
+final ProcessorSupplier processorSupplier = 
processorParameters.processorSupplier();
+if (!(processorSupplier instanceof KTableKTableAbstractJoin)) {
+throw new IllegalStateException("Unexpected processor type for 
table-table join: " + processorSupplier.getClass().getName());
+}
+final KTableKTableAbstractJoin tableJoin = 
(KTableKTableAbstractJoin) processorSupplier;
+
+if (parentNodeName.equals(tableJoin.joinThisParentNodeName())) {

Review Comment:
   Good question -- the answer is that this join node has multiple parents, 
while all the other `VersionedSemanticsGraphNode` implementations only have a 
single parent. Because this join node has multiple parents and writes two 
separate processors (one for each side of the join) to the topology, when 
enabling versioned semantics we need a way to distinguish which side of the 
join we're enabling versioned semantics for. In other words, it's possible that 
only one side of the join is "versioned," in which case one of the two join 
processors should have versioned semantics enabled while the other should not. 
The way that we determine which side of the join to enable versioned semantics 
for is based on the parent node name; the processor whose "joinThis" is the 
parent node which has been identified as versioned is the processor for which 
versioned semantics will be enabled. (In the case of a self-join, both 
processors will satisfy this check, and both processors will have versioned 
seman
 tics enabled.)
   
   For all other `VersionedSemanticsGraphNode` implementations which only have 
a single parent, we could also perform an analogous parent node name check if 
we wanted to, but the parent node name should always match so it'd be 
redundant. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14586) Move StreamsResetter to tools

2023-04-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713911#comment-17713911
 ] 

Sagar Rao commented on KAFKA-14586:
---

[~mjsax] , I can add the redirection for this. I have read through KIP-906 so I 
can take [~fvaleri] 's help. Looks like the code freeze is next week, so i will 
try to accelerate on this.

> Move StreamsResetter to tools
> -
>
> Key: KAFKA-14586
> URL: https://issues.apache.org/jira/browse/KAFKA-14586
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-18 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1170269949


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig,
   fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
 })
   }
-  val delayedFetch = new DelayedFetch(
-params = params,
-fetchPartitionStatus = fetchPartitionStatus,
-replicaManager = this,
-quota = quota,
-responseCallback = responseCallback
-  )
-
-  // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-  val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-  // try to complete the request immediately, otherwise put it into the 
purgatory;
-  // this is because while the delayed fetch operation is being created, 
new requests
-  // may arrive and hence make this operation completable.
-  delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+  if (remoteFetchInfo.isPresent) {

Review Comment:
   I am not sure line num:1082 is sane as you meant it to be as the file could 
have been updated. Please clarify. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-18 Thread via GitHub


dengziming commented on code in PR #13432:
URL: https://github.com/apache/kafka/pull/13432#discussion_r1170758199


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class ListOffsetsHandler extends Batched {
+
+private final Map offsetTimestampsByPartition;
+private final ListOffsetsOptions options;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public ListOffsetsHandler(
+Map offsetTimestampsByPartition,
+ListOffsetsOptions options,
+LogContext logContext
+) {
+this.offsetTimestampsByPartition = offsetTimestampsByPartition;
+this.options = options;
+this.log = logContext.logger(ListOffsetsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "listOffsets";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+@Override
+ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map topicsByName = 
CollectionUtils.groupPartitionsByTopic(
+keys,
+topicName -> new ListOffsetsTopic().setName(topicName),
+(listOffsetsTopic, partitionId) -> {
+TopicPartition topicPartition = new 
TopicPartition(listOffsetsTopic.name(), partitionId);
+long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
+listOffsetsTopic.partitions().add(
+new ListOffsetsPartition()
+.setPartitionIndex(partitionId)
+.setTimestamp(offsetTimestamp));
+});
+boolean supportsMaxTimestamp = keys
+.stream()
+.anyMatch(key -> offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.MAX_TIMESTAMP);
+
+return ListOffsetsRequest.Builder
+.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp)
+.setTargetTimes(new ArrayList<>(topicsByName.values()));
+}
+
+@Override
+public ApiResult handleResponse(
+Node broker,
+Set keys,
+AbstractResponse abstractResponse
+) {
+ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse;
+Map completed = new HashMap<>();
+Map failed = new HashMap<>();
+List unmapp

[jira] [Resolved] (KAFKA-14908) Sporadic "Address already in use" when starting kafka cluster embedded within tests

2023-04-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14908.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> Sporadic "Address already in use" when starting kafka cluster embedded within 
> tests
> ---
>
> Key: KAFKA-14908
> URL: https://issues.apache.org/jira/browse/KAFKA-14908
> Project: Kafka
>  Issue Type: Bug
>Reporter: Keith Wall
>Priority: Minor
> Fix For: 3.6.0
>
>
> We have an integration test suite that starts/stops a kafka cluster 
> before/after each test.   Kafka is being started programmatically within the 
> same JVM that is running the tests.
> Sometimes we get sporadic failures from with Kafka as it tries to bind the 
> server socket.
> {code:java}
> org.apache.kafka.common.KafkaException: Socket server failed to bind to 
> 0.0.0.0:9092: Address already in use.
>     at kafka.network.Acceptor.openServerSocket(SocketServer.scala:684)
>     at kafka.network.Acceptor.(SocketServer.scala:576)
>     at kafka.network.DataPlaneAcceptor.(SocketServer.scala:433)
>     at 
> kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:247)
>     at 
> kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:226)
>     at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:173)
>     at 
> kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:173)
>     at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
>     at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>     at kafka.network.SocketServer.(SocketServer.scala:173)
>     at kafka.server.KafkaServer.startup(KafkaServer.scala:331) {code}
> Investigation has shown that the socket is in the timed_wait state from a 
> previous test.
> I know Kafka supports ephemeral ports, but this isn't convenient to our 
> use-case.  
> I'd like to suggest that Kafka is changed to set the SO_REUSEADDR on the 
> server socket.  I believe this is standard practice for server applications 
> that run on well known ports .
> I don't believe this change would introduce a backward compatibility 
> concerns. 
>  
> I will open a PR so that can be considered. Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket

2023-04-18 Thread via GitHub


showuon merged PR #13572:
URL: https://github.com/apache/kafka/pull/13572


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14586) Move StreamsResetter to tools

2023-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713836#comment-17713836
 ] 

Matthias J. Sax commented on KAFKA-14586:
-

Thanks for providing context, and no worries about not knowing about the other 
KIP (there is too many things going on, and I also just realized the overlap).

Yes, `StreamsResetter` might be used programmatically, so we should add a 
redirection. Who will do this? Guess we should get it in before code freeze to 
not delay the release.

I am not worried about moving the test because it's not user facing.

Overall, it seem we can close out the other KIP and ticket as "subsumed" by 
this ticket/KIP. I can do the cleanup for it.

Just let me know if there is anything I can help with, or if the matter is 
resolved after we got the missing redirection merged.

 

> Move StreamsResetter to tools
> -
>
> Key: KAFKA-14586
> URL: https://issues.apache.org/jira/browse/KAFKA-14586
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-04-18 Thread via GitHub


hudeqi commented on PR #13473:
URL: https://github.com/apache/kafka/pull/13473#issuecomment-1514011903

   Hello, can you help to review this PR? @mimaison 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-18 Thread via GitHub


mjsax commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170677125


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java:
##
@@ -97,6 +95,27 @@ public KTableKTableJoinMerger joinMerger() {
 return (KTableKTableJoinMerger) kChangeProcessorSupplier;
 }
 
+@Override
+public void enableVersionedSemantics(final boolean useVersionedSemantics, 
final String parentNodeName) {
+enableVersionedSemantics(thisProcessorParameters(), 
useVersionedSemantics, parentNodeName);
+enableVersionedSemantics(otherProcessorParameters(), 
useVersionedSemantics, parentNodeName);
+}
+
+@SuppressWarnings("unchecked")
+private void enableVersionedSemantics(final ProcessorParameters processorParameters,
+  final boolean useVersionedSemantics,
+  final String parentNodeName) {
+final ProcessorSupplier processorSupplier = 
processorParameters.processorSupplier();
+if (!(processorSupplier instanceof KTableKTableAbstractJoin)) {
+throw new IllegalStateException("Unexpected processor type for 
table-table join: " + processorSupplier.getClass().getName());
+}
+final KTableKTableAbstractJoin tableJoin = 
(KTableKTableAbstractJoin) processorSupplier;
+
+if (parentNodeName.equals(tableJoin.joinThisParentNodeName())) {

Review Comment:
   Not sure if I understand this condition? Can you elaborate? It seems to be 
the only place when we sue the newly added `parentNodeName` -- why do we not 
use it elsewhere?



##
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java:
##
@@ -215,14 +226,19 @@ public void testInnerWithVersionedStores() {
 null,
 null,
 null,
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", 
null,  15L)),
+null,
+null,
+null,
+null,
+null,

Review Comment:
   It think there is one `null` line too many?



##
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java:
##
@@ -446,14 +482,18 @@ public void testInnerWithRightVersionedOnly() throws 
Exception {
 null,
 null,
 null,
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d", null,  15L)),
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"E-d", null,  14L)),
 null,
 null,
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"F-d", null,  14L))
+null,
+null,
+Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"E-e", null,  15L)),
+Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
null, null,  14L)),
+null,
+null,
+Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"F-e", null,  14L))

Review Comment:
   Should the last two result row flip: we first get `F-e` when we process left 
hand `F` and get nothing when we process right hand `f`?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -804,6 +806,7 @@ private  KTable doJoin(final KTable 
other,
 kTableKTableJoinNode.setOutputVersioned(isOutputVersioned);
 
 builder.addGraphNode(this.graphNode, kTableKTableJoinNode);
+builder.addGraphNode(((KTableImpl) other).graphNode, 
kTableKTableJoinNode);

Review Comment:
   Yeah. Seems to be incorrect, but did apparently not surface as a bug. Nice 
fix!



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1098,7 +1101,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 //not be done needlessly.
 ((KTableImpl) foreignKeyTable).enableSendingOldValues(true);
 
-//Old values must be sent such that the 
ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the 
correct node.
+//Old values must be sent such that the 
SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   Thank you! -- Could we do a small follow up for 3.4 branch to get it 
backported?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-18 Thread via GitHub


hachikuji commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170696945


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -332,24 +331,42 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for 
$transactionalId's AddPartitions request for verification")
   
responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId,
 partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
 } else {
-  val result: ApiResult[(Int, TransactionMetadata)] = 
getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-  
+  val result: ApiResult[Map[TopicPartition, Errors]] =
+txnManager.getTransactionState(transactionalId).flatMap {
+  case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+  case Some(epochAndMetadata) =>
+val txnMetadata = epochAndMetadata.transactionMetadata
+
+// generate the new transaction metadata with added partitions
+txnMetadata.inLock {
+  if (txnMetadata.producerId != producerId) {
+Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+  } else if (txnMetadata.producerEpoch != producerEpoch) {
+Left(Errors.PRODUCER_FENCED)
+  } else if (txnMetadata.state == PrepareCommit || 
txnMetadata.state == PrepareAbort) {
+Left(Errors.CONCURRENT_TRANSACTIONS)
+  } else {
+Right(partitions.map(part => 

Review Comment:
   nit: usually we would write `partitions.map { part =>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-18 Thread via GitHub


hachikuji commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170696945


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -332,24 +331,42 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for 
$transactionalId's AddPartitions request for verification")
   
responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId,
 partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
 } else {
-  val result: ApiResult[(Int, TransactionMetadata)] = 
getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-  
+  val result: ApiResult[Map[TopicPartition, Errors]] =
+txnManager.getTransactionState(transactionalId).flatMap {
+  case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+  case Some(epochAndMetadata) =>
+val txnMetadata = epochAndMetadata.transactionMetadata
+
+// generate the new transaction metadata with added partitions
+txnMetadata.inLock {
+  if (txnMetadata.producerId != producerId) {
+Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+  } else if (txnMetadata.producerEpoch != producerEpoch) {
+Left(Errors.PRODUCER_FENCED)
+  } else if (txnMetadata.state == PrepareCommit || 
txnMetadata.state == PrepareAbort) {
+Left(Errors.CONCURRENT_TRANSACTIONS)
+  } else {
+Right(partitions.map(part => 

Review Comment:
   nit: usually we would write `partitions.map { part =>



##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -332,24 +331,42 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for 
$transactionalId's AddPartitions request for verification")
   
responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId,
 partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
 } else {
-  val result: ApiResult[(Int, TransactionMetadata)] = 
getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-  
+  val result: ApiResult[Map[TopicPartition, Errors]] =
+txnManager.getTransactionState(transactionalId).flatMap {
+  case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+  case Some(epochAndMetadata) =>
+val txnMetadata = epochAndMetadata.transactionMetadata
+
+// generate the new transaction metadata with added partitions

Review Comment:
   nit: this comment seems misplaced
   
   Could we have a short comment here that we intentionally do not check 
pending state? We can mention that partitions are removed from the transaction 
metadata as soon as the markers are confirmed written.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lanshiqin commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-04-18 Thread via GitHub


lanshiqin commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1170685549


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,28 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * If the maximum offset beyond index, appended to the log section, it 
throws LogSegmentOffsetOverflowException
+   */
+  @ParameterizedTest
+  @CsvSource(Array(

Review Comment:
   Thanks, I've added this use case data



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-18 Thread via GitHub


jolshan commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170681401


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for 
$transactionalId's AddPartitions request for verification")
   
responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId,
 partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
 } else {
-  val result: ApiResult[(Int, TransactionMetadata)] = 
getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-  
+  val result: ApiResult[Map[TopicPartition, Errors]] =
+txnManager.getTransactionState(transactionalId).flatMap {
+  case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+  case Some(epochAndMetadata) =>
+val txnMetadata = epochAndMetadata.transactionMetadata
+
+// generate the new transaction metadata with added partitions
+txnMetadata.inLock {
+  if (txnMetadata.producerId != producerId) {
+Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+  } else if (txnMetadata.producerEpoch != producerEpoch) {
+Left(Errors.PRODUCER_FENCED)
+  } else if (txnMetadata.pendingTransitionInProgress && 
!(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) {
+// return a retriable exception to let the client backoff and 
retry
+Left(Errors.CONCURRENT_TRANSACTIONS)

Review Comment:
   If we allow any pending states, we will have to catch the error on 
kafka-14884. Which I suppose is also ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-18 Thread via GitHub


jolshan commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170681193


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for 
$transactionalId's AddPartitions request for verification")
   
responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId,
 partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
 } else {
-  val result: ApiResult[(Int, TransactionMetadata)] = 
getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-  
+  val result: ApiResult[Map[TopicPartition, Errors]] =
+txnManager.getTransactionState(transactionalId).flatMap {
+  case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+  case Some(epochAndMetadata) =>
+val txnMetadata = epochAndMetadata.transactionMetadata
+
+// generate the new transaction metadata with added partitions
+txnMetadata.inLock {
+  if (txnMetadata.producerId != producerId) {
+Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+  } else if (txnMetadata.producerEpoch != producerEpoch) {
+Left(Errors.PRODUCER_FENCED)
+  } else if (txnMetadata.pendingTransitionInProgress && 
!(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) {
+// return a retriable exception to let the client backoff and 
retry
+Left(Errors.CONCURRENT_TRANSACTIONS)

Review Comment:
   If we are pending commit or abort I don't know if it makes sense to verify 
and allow the write to continue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-18 Thread via GitHub


hachikuji commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170680513


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for 
$transactionalId's AddPartitions request for verification")
   
responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId,
 partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
 } else {
-  val result: ApiResult[(Int, TransactionMetadata)] = 
getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-  
+  val result: ApiResult[Map[TopicPartition, Errors]] =
+txnManager.getTransactionState(transactionalId).flatMap {
+  case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+  case Some(epochAndMetadata) =>
+val txnMetadata = epochAndMetadata.transactionMetadata
+
+// generate the new transaction metadata with added partitions
+txnMetadata.inLock {
+  if (txnMetadata.producerId != producerId) {
+Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+  } else if (txnMetadata.producerEpoch != producerEpoch) {
+Left(Errors.PRODUCER_FENCED)
+  } else if (txnMetadata.pendingTransitionInProgress && 
!(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) {
+// return a retriable exception to let the client backoff and 
retry
+Left(Errors.CONCURRENT_TRANSACTIONS)

Review Comment:
   Yeah, I think that's right. It works because we remove the partition after 
we have confirmed that the end transaction marker has been written. So if the 
partition is included, then it means markers are still to come. This assumes we 
fix https://issues.apache.org/jira/browse/KAFKA-14884 of course.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-04-18 Thread via GitHub


artemlivshits commented on code in PR #13579:
URL: https://github.com/apache/kafka/pull/13579#discussion_r1170670464


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for 
$transactionalId's AddPartitions request for verification")
   
responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId,
 partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava))
 } else {
-  val result: ApiResult[(Int, TransactionMetadata)] = 
getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions)
-  
+  val result: ApiResult[Map[TopicPartition, Errors]] =
+txnManager.getTransactionState(transactionalId).flatMap {
+  case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+
+  case Some(epochAndMetadata) =>
+val txnMetadata = epochAndMetadata.transactionMetadata
+
+// generate the new transaction metadata with added partitions
+txnMetadata.inLock {
+  if (txnMetadata.producerId != producerId) {
+Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+  } else if (txnMetadata.producerEpoch != producerEpoch) {
+Left(Errors.PRODUCER_FENCED)
+  } else if (txnMetadata.pendingTransitionInProgress && 
!(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) {
+// return a retriable exception to let the client backoff and 
retry
+Left(Errors.CONCURRENT_TRANSACTIONS)

Review Comment:
   Do we need this condition for verification case?  No matter what the pending 
state is, if the the state contains the partition, we're good, otherwise, we'd 
fail during verification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14920) Address timeouts and out of order sequences

2023-04-18 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14920:
--

 Summary: Address timeouts and out of order sequences
 Key: KAFKA-14920
 URL: https://issues.apache.org/jira/browse/KAFKA-14920
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan
Assignee: Justine Olshan


KAFKA-14844 showed the destructive nature of a timeout on the first produce 
request for a topic partition (ie one that has no state in psm)

Since we currently don't validate the first sequence (we will in part 2 of 
kip-890), any transient error on the first produce can lead to out of order 
sequences that never recover.

Originally, KAFKA-14561 relied on the producer's retry mechanism for these 
transient issues, but until that is fixed, we may need to retry from in the 
AddPartitionsManager instead. We addressed the concurrent transactions, but 
there are other errors like coordinator loading that we could run into and see 
increased out of order issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170662644


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+subscribedTopics.add(topic2Uuid);
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170662644


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+subscribedTopics.add(topic2Uuid);
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()

[jira] [Resolved] (KAFKA-14917) Producer write while transaction is pending.

2023-04-18 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-14917.

Resolution: Won't Fix

> Producer write while transaction is pending.
> 
>
> Key: KAFKA-14917
> URL: https://issues.apache.org/jira/browse/KAFKA-14917
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> As discovered in KAFKA-14904, we seem to get into a state where we try to 
> write to a partition while the ongoing state is still pending.
> This is likely a bigger issue than the test and worth looking in to.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14917) Producer write while transaction is pending.

2023-04-18 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713794#comment-17713794
 ] 

Justine Olshan commented on KAFKA-14917:


Turns out we set state to pending when we are adding ANY new partition. So for 
now we will allow checks if current state is ongoing and pending state is 
ongoing, since any partition in txnMetadata have been added and persisted. 

We may need to consider retries for other cases.

> Producer write while transaction is pending.
> 
>
> Key: KAFKA-14917
> URL: https://issues.apache.org/jira/browse/KAFKA-14917
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> As discovered in KAFKA-14904, we seem to get into a state where we try to 
> write to a partition while the ongoing state is still pending.
> This is likely a bigger issue than the test and worth looking in to.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170659867


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ *  the number of subscribed members is greater than the number of 
partitions for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ *This can only be done if every member is subscribed to the same topics 
and the topics are co-partitioned.
+ *Two streams are co-partitioned if the following conditions are met:
+ *
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ *
+ *  Members should retain as much as their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given member 
subscriptions.
+ *  Generate a list of members (potentiallyUnfilledMembers) 
that have not met the minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  Add members from the potentiallyUnfilled list to the 
Unfilled list if they haven't met the total required quota i.e. 
minimum number of partitions per member + 1 (if member is designated to receive 
one of the excess partitions) 
+ *  Generate a list of unassigned partitions by calculating the difference 
between total partitions and already assigned (sticky) partitions 
+ *  Iterate through unfilled members and assign partitions from the 
unassigned partitions 
+ * 
+ * 
+ *
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
+} else {
+log.info(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map>

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170658595


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ *  the number of subscribed members is greater than the number of 
partitions for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ *This can only be done if every member is subscribed to the same topics 
and the topics are co-partitioned.
+ *Two streams are co-partitioned if the following conditions are met:
+ *
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ *
+ *  Members should retain as much as their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm includes the following steps:

Review Comment:
   cool
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170658513


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.

Review Comment:
   remove the whole thing or just the html tags



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13497: KAFKA-14834: [2/N] Test coverage for out-of-order data in joins

2023-04-18 Thread via GitHub


vcrfxia commented on code in PR #13497:
URL: https://github.com/apache/kafka/pull/13497#discussion_r1170638852


##
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java:
##
@@ -88,31 +88,35 @@ public static Collection data() {
 StreamsBuilder builder;
 
 private final List> input = Arrays.asList(
-new Input<>(INPUT_TOPIC_LEFT, null),
-new Input<>(INPUT_TOPIC_RIGHT, null),
-new Input<>(INPUT_TOPIC_LEFT, "A"),
-new Input<>(INPUT_TOPIC_RIGHT, "a"),
-new Input<>(INPUT_TOPIC_LEFT, "B"),
-new Input<>(INPUT_TOPIC_RIGHT, "b"),
-new Input<>(INPUT_TOPIC_LEFT, null),
-new Input<>(INPUT_TOPIC_RIGHT, null),
-new Input<>(INPUT_TOPIC_LEFT, "C"),
-new Input<>(INPUT_TOPIC_RIGHT, "c"),
-new Input<>(INPUT_TOPIC_RIGHT, null),
-new Input<>(INPUT_TOPIC_LEFT, null),
-new Input<>(INPUT_TOPIC_RIGHT, null),
-new Input<>(INPUT_TOPIC_RIGHT, "d"),
-new Input<>(INPUT_TOPIC_LEFT, "D")
+new Input<>(INPUT_TOPIC_LEFT, null, 1),
+new Input<>(INPUT_TOPIC_RIGHT, null, 2),
+new Input<>(INPUT_TOPIC_LEFT, "A", 3),
+new Input<>(INPUT_TOPIC_RIGHT, "a", 4),
+new Input<>(INPUT_TOPIC_LEFT, "B", 5),
+new Input<>(INPUT_TOPIC_RIGHT, "b", 6),
+new Input<>(INPUT_TOPIC_LEFT, null, 7),
+new Input<>(INPUT_TOPIC_RIGHT, null, 8),
+new Input<>(INPUT_TOPIC_LEFT, "C", 9),
+new Input<>(INPUT_TOPIC_RIGHT, "c", 10),
+new Input<>(INPUT_TOPIC_RIGHT, null, 11),
+new Input<>(INPUT_TOPIC_LEFT, null, 12),
+new Input<>(INPUT_TOPIC_RIGHT, null, 13),
+new Input<>(INPUT_TOPIC_RIGHT, "d", 14),
+new Input<>(INPUT_TOPIC_LEFT, "D", 15),
+new Input<>(INPUT_TOPIC_LEFT, "E", 4), // out-of-order data

Review Comment:
   Fixed in https://github.com/apache/kafka/pull/13609.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-18 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170638371


##
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java:
##
@@ -87,7 +87,37 @@ public static Collection data() {
 
 StreamsBuilder builder;
 
-private final List> input = Arrays.asList(
+protected final List> input = Arrays.asList(
+new Input<>(INPUT_TOPIC_LEFT, null, 1),
+new Input<>(INPUT_TOPIC_RIGHT, null, 2),
+new Input<>(INPUT_TOPIC_LEFT, "A", 3),
+new Input<>(INPUT_TOPIC_RIGHT, "a", 4),
+new Input<>(INPUT_TOPIC_LEFT, "B", 5),
+new Input<>(INPUT_TOPIC_RIGHT, "b", 6),
+new Input<>(INPUT_TOPIC_LEFT, null, 7),
+new Input<>(INPUT_TOPIC_RIGHT, null, 8),
+new Input<>(INPUT_TOPIC_LEFT, "C", 9),
+new Input<>(INPUT_TOPIC_RIGHT, "c", 10),
+new Input<>(INPUT_TOPIC_RIGHT, null, 11),
+new Input<>(INPUT_TOPIC_LEFT, null, 12),
+new Input<>(INPUT_TOPIC_RIGHT, null, 13),
+new Input<>(INPUT_TOPIC_RIGHT, "d", 7), // out-of-order data with null 
as latest
+new Input<>(INPUT_TOPIC_LEFT, "D", 6),
+new Input<>(INPUT_TOPIC_LEFT, null, 2),
+new Input<>(INPUT_TOPIC_RIGHT, null, 3),
+new Input<>(INPUT_TOPIC_RIGHT, "e", 14),
+new Input<>(INPUT_TOPIC_LEFT, "E", 15),
+new Input<>(INPUT_TOPIC_LEFT, null, 10), // out-of-order data with 
non-null as latest
+new Input<>(INPUT_TOPIC_RIGHT, null, 9),
+new Input<>(INPUT_TOPIC_LEFT, "F", 4),
+new Input<>(INPUT_TOPIC_RIGHT, "f", 3)
+);
+
+// used for stream-stream join tests where out-of-order data does not 
meaningfully affect
+// the result, and the main `input` list results in too many result 
records/test noise.
+// also used for table-table multi-join tests, since out-of-order data 
with table-table
+// joins is already tested in non-multi-join settings.
+protected final List> inputWithoutOutOfOrderData = 
Arrays.asList(

Review Comment:
   As suggested in 
https://github.com/apache/kafka/pull/13497#discussion_r1163397015, I have split 
the input test data into two copies: a smaller one without out-of-order data 
(for use in stream-stream join and table-table multi-join tests) and a larger 
one with out-of-order data (for use elsewhere, including to validate versioned 
joins).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-18 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170633123


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -804,6 +806,7 @@ private  KTable doJoin(final KTable 
other,
 kTableKTableJoinNode.setOutputVersioned(isOutputVersioned);
 
 builder.addGraphNode(this.graphNode, kTableKTableJoinNode);
+builder.addGraphNode(((KTableImpl) other).graphNode, 
kTableKTableJoinNode);

Review Comment:
   I don't know why it's currently the case that primary-key table-table join 
nodes only have one parent, instead of two. Seems more correct to have two, and 
the GraphNode mechanism for determining whether the joining table is versioned 
or not will not work without this parent connection.
   
   I have verified that there is no change to the built topology, so AFAICT 
this addition is internal-only.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1098,7 +1101,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 //not be done needlessly.
 ((KTableImpl) foreignKeyTable).enableSendingOldValues(true);
 
-//Old values must be sent such that the 
ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the 
correct node.
+//Old values must be sent such that the 
SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   This, and a few other similar renames in comments, are unrelated to this PR 
but included as cleanup from https://github.com/apache/kafka/pull/13589.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-18 Thread via GitHub


vcrfxia opened a new pull request, #13609:
URL: https://github.com/apache/kafka/pull/13609

   There's a bug in the table-table join handling of out-of-order records in 
versioned tables (see https://github.com/apache/kafka/pull/13510 and 
https://github.com/apache/kafka/pull/13522 for context) where if the latest 
value for a particular key is a tombstone, then out-of-order records are not 
properly identified because versioned stores do not return timestamps for 
tombstones (so there is no timestamp to compare against, when deciding whether 
a record is out-of-order or not). This results in out-of-order records 
improperly being identified as not out-of-order, when the latest value for the 
key is a tombstone.
   
   This PR fixes the bug by using the `isLatest` value from the `Change` object 
(see https://github.com/apache/kafka/pull/13564) instead of calling `get(key)` 
on the state store to fetch timestamps to compare against. As part of this fix, 
this PR also updates table-table joins to use determine whether upstream tables 
are versioned by using the GraphNode mechanism, instead of checking the table's 
value getter. This also enables us to remove the additional state store access 
granted to join processors in https://github.com/apache/kafka/pull/13510 and 
https://github.com/apache/kafka/pull/13522, resulting in a cleaner topology.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14881) Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-18 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano resolved KAFKA-14881.
---
Resolution: Fixed

Merged to Trunk and 3.5.

> Update UserScramCredentialRecord for SCRAM ZK to KRaft migration
> 
>
> Key: KAFKA-14881
> URL: https://issues.apache.org/jira/browse/KAFKA-14881
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.5.0
>
>
> I want to support ZK to KRaft migration.
> ZK stores a storedKey and a serverKey for each SCRAM credential not the 
> saltedPassword.
> The storedKey and serverKey are a crypto hash of some data with the 
> saltedPassword and it is not possible to extract the saltedPassword from them.
> The serverKey and storedKey are enough for SCRAM authentication and 
> saltedPassword is not needed.
> I will update the UserScramCredentialRecord to store serverKey and storedKey 
> instead of saltedPassword and I will update that SCRAM is only supported with 
> a bumped version of IBP_3_5 so that there are no compatibility issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14919) MM2 ForwardingAdmin tests should not conflate admin operations

2023-04-18 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14919:
---

 Summary: MM2 ForwardingAdmin tests should not conflate admin 
operations
 Key: KAFKA-14919
 URL: https://issues.apache.org/jira/browse/KAFKA-14919
 Project: Kafka
  Issue Type: Test
  Components: mirrormaker
Reporter: Greg Harris


The MirrorConnectorsWithCustomForwardingAdminIntegrationTest uses a special 
implementation of ForwardingAdmin which records admin operations in a static 
ConcurrentMap, which is then used to perform assertions.

This has the problem that one variable (allTopics) is used to perform 
assertions for multiple different methods (adding topics, adding partitions, 
and syncing configs), despite these operations each being tested separately. 
This leads to the confusing behavior where each test appears to assert that a 
particular operation has taken place, and instead asserts that at least one of 
the operations has taken place. This allows a regression or timeout in one 
operation to be hidden by the others, making the behavior of the tests much 
less predictable.

These tests and/or the metadata store should be changed so that the tests are 
isolated from one another, and actually perform the assertions that correspond 
to their titles.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #13575: KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin timeouts

2023-04-18 Thread via GitHub


gharris1727 commented on PR #13575:
URL: https://github.com/apache/kafka/pull/13575#issuecomment-1513850889

   Follow-up ticket: https://issues.apache.org/jira/browse/KAFKA-14919


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14905) Failing tests in MM2 ForwardingAdmin test since KIP-894

2023-04-18 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-14905:
---

Assignee: Greg Harris

> Failing tests in MM2 ForwardingAdmin test since KIP-894
> ---
>
> Key: KAFKA-14905
> URL: https://issues.apache.org/jira/browse/KAFKA-14905
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.5.0
>
>
> There are three tests which are consistently failing in 
> MirrorConnectorsWithCustomForwardingAdminIntegrationTest since the merge of 
> KIP-894 in KAFKA-14420:
>  * testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
>  * testCreatePartitionsUseProvidedForwardingAdmin()
>  * testSyncTopicConfigUseProvidedForwardingAdmin()
> {noformat}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Topic: primary.test-topic-1 didn't get created in the FakeLocalMetadataStore 
> ==> expected:  but was: 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>   at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>   at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:214)
> (similar)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:243)
> (similar)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:280){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14905) Failing tests in MM2 ForwardingAdmin test since KIP-894

2023-04-18 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14905:

Labels: flaky-test  (was: )

> Failing tests in MM2 ForwardingAdmin test since KIP-894
> ---
>
> Key: KAFKA-14905
> URL: https://issues.apache.org/jira/browse/KAFKA-14905
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
> There are three tests which are consistently failing in 
> MirrorConnectorsWithCustomForwardingAdminIntegrationTest since the merge of 
> KIP-894 in KAFKA-14420:
>  * testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
>  * testCreatePartitionsUseProvidedForwardingAdmin()
>  * testSyncTopicConfigUseProvidedForwardingAdmin()
> {noformat}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Topic: primary.test-topic-1 didn't get created in the FakeLocalMetadataStore 
> ==> expected:  but was: 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>   at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>   at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:214)
> (similar)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:243)
> (similar)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:280){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-18 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170599794


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   Offline discussion with @hachikuji  - it seems like what we want is to 
revoke the old partitions but resend these partitions on the subsequent join. 
RN, join only sends out the assigned partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13575: KAFKA-14905: Intercept incrementalAlterConfigs in MM2 ForwardingAdminTest

2023-04-18 Thread via GitHub


gharris1727 commented on PR #13575:
URL: https://github.com/apache/kafka/pull/13575#issuecomment-1513825281

   Okay, upon further investigation it appears that this is technically a 
flakey test failure, it just nearly always fails in CI due to CPU load. The 
core reason for the failure is that the FakeForwardingAdminWithLocalMetadata 
has a fixed 1-second timeout for each operation. This has the effect that each 
of the operations that the FakeForwardingAdminWithLocalMetadata intercepts can 
succeed in Kafka, and the connector will know it has succeeded, but the 
FakeLocalMetadataStore will not be updated, which can cause assertions to fail 
despite the connector satisfying the condition described by the test name.
   
   The reason for multiple distinct tests all regressing at once, and all 
passing locally appears to be due to the _tests not distinguishing between the 
different admin client operations, and conflating one operation for another_. 
This meant that before the regression, the tests which created the topics and 
created partitions actually were relying on the config sync to pass. After the 
regression, the tests which created partitions and synced configs passed 
locally because the topic creation had a good success rate with low CPU load.
   
   With this knowledge, I think that this implementation of the 
FakeLocalMetadataStore is fundamentally flawed, and these tests are ultimately 
not performing the assertions that they should be. They can not reliably detect 
regressions and aren't really providing any value as-is. I think in this PR 
i'll apply a tactical fix to get the tests to pass (either raise or remove the 
timeout, or mark the tests as ignored) and follow-up after the upcoming release 
to rewrite these tests to actually test what they say they do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan opened a new pull request, #13608: KAFKA-14844: Include check transaction is still ongoing right before append

2023-04-18 Thread via GitHub


jolshan opened a new pull request, #13608:
URL: https://github.com/apache/kafka/pull/13608

   We will need to pick up the changes in KAFKA-14916 (right now we assume 
producer ID is shared in all batches), but wanted to get a WIP draft out for 
general ideas.
   
   Will need to add the commented section in analyzeAndValidateProducerState 
and tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13589: MINOR: rename internal FK-join processor classes

2023-04-18 Thread via GitHub


mjsax commented on PR #13589:
URL: https://github.com/apache/kafka/pull/13589#issuecomment-1513693392

   Merged to `trunk` and cherry-picked to `3.5` and `3.4` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-04-18 Thread via GitHub


jolshan commented on PR #13607:
URL: https://github.com/apache/kafka/pull/13607#issuecomment-1513675548

   I already see the checkstyle issues created by my IDE so I will fix in a bit.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan opened a new pull request, #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-04-18 Thread via GitHub


jolshan opened a new pull request, #13607:
URL: https://github.com/apache/kafka/pull/13607

   Adds validation to ensure all producer IDs in a transactional/idempotent 
produce request are the same.
   
   Also modifies verification to only add a partition to verify if it is 
transactional. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 merged pull request #13588: [MINOR]: Fixing gradle build during compileScala and compileTestScala

2023-04-18 Thread via GitHub


chia7712 merged PR #13588:
URL: https://github.com/apache/kafka/pull/13588


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13606: KAFKA-14918 Only send controller RPCs to migrating ZK brokers

2023-04-18 Thread via GitHub


cmccabe commented on code in PR #13606:
URL: https://github.com/apache/kafka/pull/13606#discussion_r1170454667


##
core/src/main/scala/kafka/migration/MigrationPropagator.scala:
##
@@ -70,14 +70,24 @@ class MigrationPropagator(
 
   override def publishMetadata(image: MetadataImage): Unit = {
 val oldImage = _image
-val addedBrokers = new 
util.HashSet[Integer](image.cluster().brokers().keySet())
-addedBrokers.removeAll(oldImage.cluster().brokers().keySet())
-val removedBrokers = new 
util.HashSet[Integer](oldImage.cluster().brokers().keySet())
-removedBrokers.removeAll(image.cluster().brokers().keySet())
-
-removedBrokers.asScala.foreach(id => channelManager.removeBroker(id))
-addedBrokers.asScala.foreach(id =>
-  
channelManager.addBroker(Broker.fromBrokerRegistration(image.cluster().broker(id
+val prevBrokers = oldImage.cluster().brokers().values().asScala
+  .filter(_.isMigratingZkBroker)
+  .filterNot(_.fenced)
+  .map(Broker.fromBrokerRegistration)
+  .toSet
+
+val aliveBrokers = image.cluster().brokers().values().asScala
+  .filter(_.isMigratingZkBroker)
+  .filterNot(_.fenced)
+  .map(Broker.fromBrokerRegistration)
+  .toSet
+
+val addedBrokers = aliveBrokers -- prevBrokers
+val removedBrokers = prevBrokers -- aliveBrokers
+
+stateChangeLogger.logger.debug(s"Adding brokers $addedBrokers, removing 
brokers $removedBrokers.")

Review Comment:
   can we make this INFO and only do it if addedBrokers or removedBrokers is 
non-empty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14918) KRaft controller sending ZK controller RPCs to KRaft brokers

2023-04-18 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14918:
-
Fix Version/s: 3.5.0

> KRaft controller sending ZK controller RPCs to KRaft brokers
> 
>
> Key: KAFKA-14918
> URL: https://issues.apache.org/jira/browse/KAFKA-14918
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.5.0
>
>
> During the migration, when upgrading a ZK broker to KRaft, the controller is 
> incorrectly sending UpdateMetadata requests to the KRaft controller. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14918) KRaft controller sending ZK controller RPCs to KRaft brokers

2023-04-18 Thread David Arthur (Jira)
David Arthur created KAFKA-14918:


 Summary: KRaft controller sending ZK controller RPCs to KRaft 
brokers
 Key: KAFKA-14918
 URL: https://issues.apache.org/jira/browse/KAFKA-14918
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Arthur
Assignee: David Arthur


During the migration, when upgrading a ZK broker to KRaft, the controller is 
incorrectly sending UpdateMetadata requests to the KRaft controller. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dimitarndimitrov commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-18 Thread via GitHub


dimitarndimitrov commented on code in PR #13432:
URL: https://github.com/apache/kafka/pull/13432#discussion_r1170437027


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java:
##
@@ -260,12 +261,29 @@ public void onFailure(
 .filter(future.lookupKeys()::contains)
 .collect(Collectors.toSet());
 retryLookup(keysToUnmap);
+} else if (t instanceof UnsupportedVersionException) {

Review Comment:
   Done.
   Also added a check that `UnsupportedVersionException` during the lookup 
stage causes a failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13589: MINOR: rename internal FK-join processor classes

2023-04-18 Thread via GitHub


mjsax merged PR #13589:
URL: https://github.com/apache/kafka/pull/13589


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] emissionnebula commented on pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-18 Thread via GitHub


emissionnebula commented on PR #13437:
URL: https://github.com/apache/kafka/pull/13437#issuecomment-1513618207

   A lot of 
[tests](https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-13437/runs/8/log/?start=0)
 related to Connect seems to be failing. But those are unrelated. I could see a 
similar set of tests failing in [other 
PRs](https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-13595/runs/3/log/?start=0)
 as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170357303


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+subscribedTopics.add(topic2Uuid);
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).ad

[GitHub] [kafka] jsancio commented on a diff in pull request #13540: MINOR: improve QuorumController logging

2023-04-18 Thread via GitHub


jsancio commented on code in PR #13540:
URL: https://github.com/apache/kafka/pull/13540#discussion_r1170345743


##
metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java:
##
@@ -65,6 +81,7 @@ void failAll(Exception exception) {
 while (iter.hasNext()) {
 Entry> entry = iter.next();
 for (DeferredEvent event : entry.getValue()) {
+log.info("failAll({}): failing {}.", 
exception.getClass().getSimpleName(), event);

Review Comment:
   I see. We don't log the stacktrace because `failAll` is only called with 
`NotControllerException`. Should we change the signature of this method to 
`void failAll(ApiException)`?



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -977,16 +969,8 @@ public void 
handleSnapshot(SnapshotReader reader) {
 long offset = batch.lastOffset();
 List messages = batch.records();
 
-if (log.isDebugEnabled()) {
-if (log.isTraceEnabled()) {
-log.trace("Replaying snapshot ({}) batch with 
last offset of {}: {}",
-reader.snapshotId(), offset, 
messages.stream().map(ApiMessageAndVersion::toString).
-collect(Collectors.joining(", ")));
-} else {
-log.debug("Replaying snapshot ({}) batch with 
last offset of {}",
-reader.snapshotId(), offset);
-}
-}
+log.debug("Replaying snapshot {} batch with last 
offset of {}",
+reader.snapshotId(), offset);

Review Comment:
   Do you want to use `snapshotName` here to make it consistent with the rest 
of the log messages?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #13605: KAFKA-____: implement assign()

2023-04-18 Thread via GitHub


philipnee opened a new pull request, #13605:
URL: https://github.com/apache/kafka/pull/13605

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13533: KAFKA-12446: update change encoding to use varint

2023-04-18 Thread via GitHub


wcarlson5 commented on code in PR #13533:
URL: https://github.com/apache/kafka/pull/13533#discussion_r1170320445


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##
@@ -104,33 +104,40 @@ public byte[] serialize(final String topic, final Headers 
headers, final Change<
 final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
 
 // The serialization format is:
-// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
-// {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
-// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE newOldFlag=2}
-final ByteBuffer buf;
+// {BYTE_ARRAY oldValue}{BYTE encodingFlag=0}
+// {BYTE_ARRAY newValue}{BYTE encodingFlag=1}
+// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE encodingFlag=2}

Review Comment:
   UINT32 or VARINT?



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java:
##
@@ -141,15 +141,15 @@ private static byte[] serializeVersions3Through5(final 
String topic, final Chang
 final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
 
 // The serialization format is:
-// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
-// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
-// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
+// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3}
+// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4}
+// {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE isLatest}{BYTE encodingFlag=5}
 final ByteBuffer buf;
 final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0;
 if (newValueIsNotNull && oldValueIsNotNull) {
-final int capacity = UINT32_SIZE + newDataLength + oldDataLength + 
IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
+final int capacity = MAX_VARINT_LENGTH + newDataLength + 
oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;

Review Comment:
   You changed the `NEW_OLD_FLAG_SIZE` elsewhere to `ENCODING_FLAG_SIZE` can we 
change that here too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170322124


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+subscribedTopics.add(topic2Uuid);
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()

[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-18 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1170301813


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -207,35 +234,38 @@ class RPCProducerIdManager(brokerId: Int,
 })
   }
 
+  // Visible for testing
   private[transaction] def handleAllocateProducerIdsResponse(response: 
AllocateProducerIdsResponse): Unit = {
-requestInFlight.set(false)
 val data = response.data
+var successfulResponse = false
 Errors.forCode(data.errorCode()) match {
   case Errors.NONE =>
 debug(s"Got next producer ID block from controller $data")
 // Do some sanity checks on the response
-if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) {
-  nextProducerIdBlock.put(Failure(new KafkaException(
-s"Producer ID block is not monotonic with current block: 
current=$currentProducerIdBlock response=$data")))
+if (data.producerIdStart() < 
currentProducerIdBlock.get.lastProducerId) {
+  error(s"Producer ID block is not monotonic with current block: 
current=$currentProducerIdBlock response=$data")
 } else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || 
data.producerIdStart() > Long.MaxValue - data.producerIdLen()) {
-  nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID 
block includes invalid ID range: $data")))
+  error(s"Producer ID block includes invalid ID range: $data")
 } else {
-  nextProducerIdBlock.put(
-Success(new ProducerIdsBlock(brokerId, data.producerIdStart(), 
data.producerIdLen(
+  nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, 
data.producerIdStart(), data.producerIdLen()))
+  successfulResponse = true
 }
   case Errors.STALE_BROKER_EPOCH =>
-warn("Our broker epoch was stale, trying again.")
-maybeRequestNextBlock()
+warn("Our broker currentBlockCount was stale, trying again.")
   case Errors.BROKER_ID_NOT_REGISTERED =>
 warn("Our broker ID is not yet known by the controller, trying again.")
-maybeRequestNextBlock()
   case e: Errors =>
-warn("Had an unknown error from the controller, giving up.")
-nextProducerIdBlock.put(Failure(e.exception()))
+error(s"Had an unknown error from the controller: ${e.exception}")

Review Comment:
   nit: maybe we can rephrase this message a little for clarity
   ```scala
   error(s"Received an unexpected error code from the controller: $e")
   ```



##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first bloc

[GitHub] [kafka] cmccabe closed pull request #13590: Scram kraft update user scram credential record

2023-04-18 Thread via GitHub


cmccabe closed pull request #13590: Scram kraft update user scram credential 
record
URL: https://github.com/apache/kafka/pull/13590


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13513: KAFKA-14881: Rework UserScramCredentialRecord

2023-04-18 Thread via GitHub


cmccabe merged PR #13513:
URL: https://github.com/apache/kafka/pull/13513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-18 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1170273242


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+? firstBatch.sizeInBytes() : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatch.sizeInBytes();
+
+if (remainingBytes > 0) {
+// input stream is read till (startPos - 1) while getting the 
batch of records earlier.
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(new 
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));

Review Comment:
   Good catch, addressed it in the latest commit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-18 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1168261768


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1243,6 +1327,33 @@ class ReplicaManager(val config: KafkaConfig,
 result
   }
 
+  def createLogReadResult(highWatermark: Long,

Review Comment:
   `createLogReadResult(e: Throwable)` can not be private as it is used in 
`DelayedRemoteFetch`. But this method can be used. It is going to be used in 
test classes that we are going to add in this PR or followup PR. 



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();

Review Comment:
   afaik, `lastFetchedEpoch` is the epoch of the last fetched record. That can 
be different from the fetch offset’s epoch. We should find the respective epoch 
for the target offset and use that to find the remote log segment metadata.
   



##
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;

Review Comment:
   `RemoteLogReader` can not be moved to storage module as it currently depends 
on `RemoteLogManager`. I will move along with `RemoteLogManager` later. 
   `RemoteLogReadResult` and `RemoteStorageThreadPool` are moved to storage 
module. 



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig,
   fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
 })
   }
-  val delayedFetch = new DelayedFetch(
-params = params,
-fetchPartitionStatus = fetchPartitionStatus,
-replicaManager = this,
-quota = quota,
-responseCallback = responseCallback
-  )
-
-  // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-  val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-  // try to complete the request immediately, otherwise put it into the 
purgatory;
-  // this is because while the delayed fetch operation is being created, 
new requests
-  // may arrive and hence make this operation completable.
-  delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+  if (remoteFetchInfo.isPresent) {

Review Comment:
   I did not understand the comment here.  



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDef

[GitHub] [kafka] k-wall commented on pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket

2023-04-18 Thread via GitHub


k-wall commented on PR #13572:
URL: https://github.com/apache/kafka/pull/13572#issuecomment-1513428217

   @divijvaidya thanks for the review feedback, much appreciated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] k-wall commented on a diff in pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket

2023-04-18 Thread via GitHub


k-wall commented on code in PR #13572:
URL: https://github.com/apache/kafka/pull/13572#discussion_r1170252886


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1893,6 +1893,33 @@ class SocketServerTest {
 }, false)
   }
 
+  @Test
+  def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+val acceptor = server.dataPlaneAcceptor(listener)
+val channel = acceptor.get.serverChannel
+verifySocketUsesReuseAddress(channel)
+  }
+
+  @Test
+  def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+shutdownServerAndMetrics(server)
+val testProps = new Properties
+testProps ++= props
+testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
+testProps.put("listener.security.protocol.map", 
"PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
+testProps.put("control.plane.listener.name", "CONTROL_PLANE")
+val config = KafkaConfig.fromProps(testProps)
+val testServer = new SocketServer(config, metrics, Time.SYSTEM, 
credentialProvider, apiVersionManager)
+val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel

Review Comment:
   Test renamed and test now checks both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] k-wall commented on a diff in pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket

2023-04-18 Thread via GitHub


k-wall commented on code in PR #13572:
URL: https://github.com/apache/kafka/pull/13572#discussion_r1170252886


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1893,6 +1893,33 @@ class SocketServerTest {
 }, false)
   }
 
+  @Test
+  def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+val acceptor = server.dataPlaneAcceptor(listener)
+val channel = acceptor.get.serverChannel
+verifySocketUsesReuseAddress(channel)
+  }
+
+  @Test
+  def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+shutdownServerAndMetrics(server)
+val testProps = new Properties
+testProps ++= props
+testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
+testProps.put("listener.security.protocol.map", 
"PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
+testProps.put("control.plane.listener.name", "CONTROL_PLANE")
+val config = KafkaConfig.fromProps(testProps)
+val testServer = new SocketServer(config, metrics, Time.SYSTEM, 
credentialProvider, apiVersionManager)
+val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel

Review Comment:
   Comment added documenting use of ephemerals. 



##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1893,6 +1893,33 @@ class SocketServerTest {
 }, false)
   }
 
+  @Test
+  def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+val acceptor = server.dataPlaneAcceptor(listener)
+val channel = acceptor.get.serverChannel
+verifySocketUsesReuseAddress(channel)
+  }
+
+  @Test
+  def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+shutdownServerAndMetrics(server)
+val testProps = new Properties
+testProps ++= props
+testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")

Review Comment:
   Comment added documenting use of ephemerals.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] k-wall commented on a diff in pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket

2023-04-18 Thread via GitHub


k-wall commented on code in PR #13572:
URL: https://github.com/apache/kafka/pull/13572#discussion_r1170251661


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1893,6 +1893,33 @@ class SocketServerTest {
 }, false)
   }
 
+  @Test
+  def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+val acceptor = server.dataPlaneAcceptor(listener)
+val channel = acceptor.get.serverChannel
+verifySocketUsesReuseAddress(channel)
+  }
+
+  @Test
+  def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+shutdownServerAndMetrics(server)
+val testProps = new Properties
+testProps ++= props
+testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
+testProps.put("listener.security.protocol.map", 
"PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
+testProps.put("control.plane.listener.name", "CONTROL_PLANE")
+val config = KafkaConfig.fromProps(testProps)
+val testServer = new SocketServer(config, metrics, Time.SYSTEM, 
credentialProvider, apiVersionManager)
+val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel
+verifySocketUsesReuseAddress(channel)
+shutdownServerAndMetrics(testServer)

Review Comment:
   Now using try/finally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-04-18 Thread via GitHub


jolshan commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1170250900


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   Were we expecting to use this code path when the IBP is less than 2.8? I 
guess I assumed that the IBP would be higher.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13582: MINOR: Fix lossy conversions flagged by Java 20

2023-04-18 Thread via GitHub


divijvaidya commented on code in PR #13582:
URL: https://github.com/apache/kafka/pull/13582#discussion_r1170245494


##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##
@@ -434,7 +434,7 @@ private static byte computeAttributes(CompressionType type, 
TimestampType timest
 if (isControl)
 attributes |= CONTROL_FLAG_MASK;
 if (type.id > 0)
-attributes |= COMPRESSION_CODEC_MASK & type.id;
+attributes |= (byte) (COMPRESSION_CODEC_MASK & type.id);

Review Comment:
   I expected type promotion to a common ancestor for cases where overflow is 
expected such as multiplication but didn't expect it for `&` bit operation. 
Nevertheless TIL! 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-04-18 Thread via GitHub


divijvaidya commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1170238988


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,28 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * If the maximum offset beyond index, appended to the log section, it 
throws LogSegmentOffsetOverflowException
+   */
+  @ParameterizedTest
+  @CsvSource(Array(

Review Comment:
   Note that baseOffset can be `Long` but `largestOffset - baseOffset` should 
be <= `Integer.MaxValue`. This case is missing from our test here.
   
   Could we add the following test cases as well:
   
   baseOffset is a number > Integer.MaxValue, largestOffset is a number > 
Integer.MaxValue, such that `largestOffset - baseOffset` > Integer.MaxValue. 
This will throw an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket

2023-04-18 Thread via GitHub


divijvaidya commented on code in PR #13572:
URL: https://github.com/apache/kafka/pull/13572#discussion_r1170213078


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1893,6 +1893,33 @@ class SocketServerTest {
 }, false)
   }
 
+  @Test
+  def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+val acceptor = server.dataPlaneAcceptor(listener)
+val channel = acceptor.get.serverChannel
+verifySocketUsesReuseAddress(channel)
+  }
+
+  @Test
+  def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+shutdownServerAndMetrics(server)
+val testProps = new Properties
+testProps ++= props
+testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")

Review Comment:
   please add a comment here.
   
   We are using 0 so that OS can choose to associated any available port here.



##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1893,6 +1893,33 @@ class SocketServerTest {
 }, false)
   }
 
+  @Test
+  def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+val acceptor = server.dataPlaneAcceptor(listener)
+val channel = acceptor.get.serverChannel
+verifySocketUsesReuseAddress(channel)
+  }
+
+  @Test
+  def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+shutdownServerAndMetrics(server)
+val testProps = new Properties
+testProps ++= props
+testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
+testProps.put("listener.security.protocol.map", 
"PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
+testProps.put("control.plane.listener.name", "CONTROL_PLANE")
+val config = KafkaConfig.fromProps(testProps)
+val testServer = new SocketServer(config, metrics, Time.SYSTEM, 
credentialProvider, apiVersionManager)
+val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel
+verifySocketUsesReuseAddress(channel)
+shutdownServerAndMetrics(testServer)

Review Comment:
   perhaps do this in try/finally so that the resources are cleaned even if the 
test fails with an exception.



##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1893,6 +1893,33 @@ class SocketServerTest {
 }, false)
   }
 
+  @Test
+  def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+val acceptor = server.dataPlaneAcceptor(listener)
+val channel = acceptor.get.serverChannel
+verifySocketUsesReuseAddress(channel)
+  }
+
+  @Test
+  def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = {
+shutdownServerAndMetrics(server)
+val testProps = new Properties
+testProps ++= props
+testProps.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
+testProps.put("listener.security.protocol.map", 
"PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
+testProps.put("control.plane.listener.name", "CONTROL_PLANE")
+val config = KafkaConfig.fromProps(testProps)
+val testServer = new SocketServer(config, metrics, Time.SYSTEM, 
credentialProvider, apiVersionManager)
+val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel

Review Comment:
   since we are starting up two listeners, could we verify reuse address for 
both please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-18 Thread via GitHub


urbandan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1170149228


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -266,7 +266,7 @@ public synchronized TransactionalRequestResult beginAbort() 
{
 return handleCachedTransactionRequestResult(() -> {
 if (currentState != State.ABORTABLE_ERROR)
 maybeFailWithError();
-transitionTo(State.ABORTING_TRANSACTION);
+transitionTo(State.ABORTING_TRANSACTION, null, true);

Review Comment:
   there is a call chain where the Sender calls beginAbort on producer close - 
do we want to throw there? I'm not sure if that one qualifies as a 
"user-direct" action



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-18 Thread via GitHub


urbandan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1170141463


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -968,13 +968,31 @@ private void transitionTo(State target) {
 }
 
 private void transitionTo(State target, RuntimeException error) {
+transitionTo(target, error, false);

Review Comment:
   I think changing the default behavior to not throw can cause issues in some 
calls:
   1. TransactionManager.InitProducerIdHandler#handleResponse on line 1303 - 
lastError is explicitly set to null (which shouldn't be done at all, as 
transitionTo already does that if the state transition is valid), which will 
clear the latest error. I think to make this work, that lastError = null should 
be removed from line 1303
   2. This is a call chain where we transition on direct user action, shouldn't 
this be throwing? KafkaProducer.send -> KafkaProducer.doSend -> 
maybeTransitionToErrorState -> transitionToAbortableError -> transitionTo
   3. In TransactionManager.TxnOffsetCommitHandler#handleResponse, there are 
multiple
   ```
   abortableError(...);
   break;
   ```
   blocks. If abortableError does not throw on invalid state transition 
anymore, the txn commit will be retried, even when in a failed state, which 
doesn't seem correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

2023-04-18 Thread via GitHub


clolov commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1170144013


##
examples/src/main/java/kafka/examples/Utils.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.examples;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+public class Utils {
+private Utils() {
+}
+
+public static void printHelp(String message, Object... args) {
+System.out.println(format(message, args));
+}
+
+public static void printOut(String message, Object... args) {
+System.out.printf("%s - %s%n", Thread.currentThread().getName(), 
format(message, args));
+}
+
+public static void printErr(String message, Object... args) {
+System.err.printf("%s - %s%n", Thread.currentThread().getName(), 
format(message, args));
+}
+
+public static void maybePrintRecord(long numRecords, 
ConsumerRecord record) {
+maybePrintRecord(numRecords, record.key(), record.value(), 
record.topic(), record.partition(), record.offset());
+}
+
+public static void maybePrintRecord(long numRecords, int key, String 
value, RecordMetadata metadata) {
+maybePrintRecord(numRecords, key, value, metadata.topic(), 
metadata.partition(), metadata.offset());
+}
+
+private static void maybePrintRecord(long numRecords, int key, String 
value, String topic, int partition, long offset) {
+// we only print 10 records when there are 20 or more to send

Review Comment:
   Ah, yes, my bad, I had to put pen to paper to figure out that this does 
indeed only start printing 10 once we go higher than 20.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2023-04-18 Thread via GitHub


clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1513271209

   Okay, this makes sense to me. I will aim to start opening PRs in the same 
manner as yours in the upcoming days and let's see where we go!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13604: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-18 Thread via GitHub


jeffkbkim opened a new pull request, #13604:
URL: https://github.com/apache/kafka/pull/13604

   …-915, Part-2) (#13526)
   
   This patch implemented the second part of KIP-915. It bumps the versions of 
the value records used by the group coordinator and the transaction coordinator 
to make them flexible versions. The new versions are not used when writing to 
the partitions but only when reading from the partitions. This allows 
downgrades from future versions that will include tagged fields.
   
   Reviewers: David Jacot 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13603: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-18 Thread via GitHub


jeffkbkim opened a new pull request, #13603:
URL: https://github.com/apache/kafka/pull/13603

   …-915, Part-2) (#13526)
   
   This patch implemented the second part of KIP-915. It bumps the versions of 
the value records used by the group coordinator and the transaction coordinator 
to make them flexible versions. The new versions are not used when writing to 
the partitions but only when reading from the partitions. This allows 
downgrades from future versions that will include tagged fields.
   
   Reviewers: David Jacot 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13529: KAFKA-14133: Migrate topology builder mock in TaskManagerTest to mockito

2023-04-18 Thread via GitHub


clolov commented on code in PR #13529:
URL: https://github.com/apache/kafka/pull/13529#discussion_r1170120571


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2895,10 +2871,8 @@ public void 
shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
 expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap(.andReturn(Collections.emptySet());
 
expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10));
 
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap(.andReturn(Collections.emptySet());
-topologyBuilder.addSubscribedTopicsFromAssignment(eq(asList(t1p0)), 
anyString());

Review Comment:
   Got it, okay, this makes a lot of sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13602: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-18 Thread via GitHub


jeffkbkim opened a new pull request, #13602:
URL: https://github.com/apache/kafka/pull/13602

   …-915, Part-2) (#13526)
   
   This patch implemented the second part of KIP-915. It bumps the versions of 
the value records used by the group coordinator and the transaction coordinator 
to make them flexible versions. The new versions are not used when writing to 
the partitions but only when reading from the partitions. This allows 
downgrades from future versions that will include tagged fields.
   
   Reviewers: David Jacot 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13601: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-18 Thread via GitHub


jeffkbkim opened a new pull request, #13601:
URL: https://github.com/apache/kafka/pull/13601

   …-915, Part-2) (#13526)
   
   This patch implemented the second part of KIP-915. It bumps the versions of 
the value records used by the group coordinator and the transaction coordinator 
to make them flexible versions. The new versions are not used when writing to 
the partitions but only when reading from the partitions. This allows 
downgrades from future versions that will include tagged fields.
   
   Reviewers: David Jacot 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-18 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170098942


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   Added to my previous comment, I think you will need to set the 
`needsOnJoinPrepare` to true to go through the revocation as pointed out here: 
https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L821



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-18 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170094810


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   Hmm, I think the partitions are only lost during the onJoinPrepare, what I'm 
thinking is this: 
https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L507-L511
   
   The gist is: on one of the 4 exception thrown in join/sync group, it should 
immediately re-send the join request. Are you thinking about the how client 
handles the illegal generation error? I think it is only being thrown during 
sync group and heartbeat. So I think by just resetting the generation shouldn't 
immediately causes revocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-18 Thread via GitHub


philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170094810


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
 log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
  "Sent generation was {}", sentGeneration);
+resetStateAndGeneration("member missed the rebalance", 
true);

Review Comment:
   Hmm, I think the partitions are only lost during the onJoinPrepare, what I'm 
thinking is this: 
https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L507
   
   The gist is: on one of the 4 exception thrown in join/sync group, it should 
immediately re-send the join request. Are you thinking about the how client 
handles the illegal generation error? I think it is only being thrown during 
sync group and heartbeat. So I think by just resetting the generation shouldn't 
immediately causes revocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13600: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-18 Thread via GitHub


jeffkbkim opened a new pull request, #13600:
URL: https://github.com/apache/kafka/pull/13600

   …-915, Part-2) (#13526)
   
   This patch implemented the second part of KIP-915. It bumps the versions of 
the value records used by the group coordinator and the transaction coordinator 
to make them flexible versions. The new versions are not used when writing to 
the partitions but only when reading from the partitions. This allows 
downgrades from future versions that will include tagged fields.
   
   Reviewers: David Jacot 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2023-04-18 Thread via GitHub


mimaison commented on PR #11478:
URL: https://github.com/apache/kafka/pull/11478#issuecomment-1513201845

   Hi @jlprat, the point of tests is also to ensure future changes don't break 
this feature. You're more familiar with this feature than me, if you think the 
unit tests are enough, you can merge the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14869) txn and group coordinator downgrade foundation

2023-04-18 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14869:
---
Fix Version/s: 3.5.0

> txn and group coordinator downgrade foundation
> --
>
> Key: KAFKA-14869
> URL: https://issues.apache.org/jira/browse/KAFKA-14869
> Project: Kafka
>  Issue Type: Task
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
> Fix For: 3.5.0
>
>
> Implement proposed changes in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-915%3A+Txn+and+Group+Coordinator+Downgrade+Foundation]
>  
>  - ignore unknown record types
>  - bump Value type records in __consumer_offsets and __transaction_state 
> topics to a flexible version
>  - serialize with highest non-flexible version (gated by IBP)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-18 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1170068205


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)

Review Comment:
   updated to use requestInFlight to fence



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)

2023-04-18 Thread via GitHub


dajac commented on PR #13526:
URL: https://github.com/apache/kafka/pull/13526#issuecomment-1513180068

   Merged to trunk and 3.5. We need to open PRs for the other branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)

2023-04-18 Thread via GitHub


dajac merged PR #13526:
URL: https://github.com/apache/kafka/pull/13526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13582: MINOR: Fix lossy conversions flagged by Java 20

2023-04-18 Thread via GitHub


ijuma commented on code in PR #13582:
URL: https://github.com/apache/kafka/pull/13582#discussion_r1170050264


##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##
@@ -434,7 +434,7 @@ private static byte computeAttributes(CompressionType type, 
TimestampType timest
 if (isControl)
 attributes |= CONTROL_FLAG_MASK;
 if (type.id > 0)
-attributes |= COMPRESSION_CODEC_MASK & type.id;
+attributes |= (byte) (COMPRESSION_CODEC_MASK & type.id);

Review Comment:
   Unfortunately, the answer is yes: 
https://docs.oracle.com/javase/specs/jls/se8/html/jls-5.html#jls-5.6.2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14735) Improve KRaft metadata image change performance at high topic counts

2023-04-18 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino resolved KAFKA-14735.
---
Resolution: Fixed

> Improve KRaft metadata image change performance at high topic counts
> 
>
> Key: KAFKA-14735
> URL: https://issues.apache.org/jira/browse/KAFKA-14735
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 3.6.0
>
>
> Performance of KRaft metadata image changes is currently O(<# of topics in 
> cluster>).  This means the amount of time it takes to create just a *single* 
> topic scales linearly with the number of topics in the entire cluster.  This 
> impact both controllers and brokers because both use the metadata image to 
> represent the KRaft metadata log.  The performance of these changes should 
> scale with the number of topics being changed -- so creating a single topic 
> should perform similarly regardless of the number of topics in the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dimitarndimitrov commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-18 Thread via GitHub


dimitarndimitrov commented on code in PR #13432:
URL: https://github.com/apache/kafka/pull/13432#discussion_r1170024821


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.OffsetSpec.TimestampSpec;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class ListOffsetsHandler extends Batched {
+
+private final Map offsetSpecsByPartition;
+private final ListOffsetsOptions options;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public ListOffsetsHandler(
+Map offsetSpecsByPartition,
+ListOffsetsOptions options,
+LogContext logContext
+) {
+this.offsetSpecsByPartition = offsetSpecsByPartition;
+this.options = options;
+this.log = logContext.logger(ListOffsetsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "listOffsets";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+@Override
+ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map topicsByName = 
CollectionUtils.groupPartitionsByTopic(
+keys,
+topicName -> new ListOffsetsTopic().setName(topicName),
+(listOffsetsTopic, partitionId) -> {
+TopicPartition topicPartition = new 
TopicPartition(listOffsetsTopic.name(), partitionId);
+OffsetSpec offsetSpec = 
offsetSpecsByPartition.get(topicPartition);
+long offsetQuery = getOffsetFromSpec(offsetSpec);
+listOffsetsTopic.partitions().add(
+new ListOffsetsPartition()
+.setPartitionIndex(partitionId)
+.setTimestamp(offsetQuery));
+});
+boolean supportsMaxTimestamp = keys
+.stream()
+.anyMatch(key -> 
getOffsetFromSpec(offsetSpecsByPartition.get(key)) == 
ListOffsetsRequest.MAX_TIMESTAMP);
+
+return ListOffsetsRequest.Builder
+.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp)
+.setTargetTimes(new ArrayList<>(topicsByName.values()));
+}
+
+@Override
+public ApiResult handleResponse(
+Node broker,
+Set keys,
+AbstractResponse abstractResponse
+) {
+

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170021475


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+subscribedTopics.add(topic2Uuid);
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).ad

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170017991


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+subscribedTopics.add(topic2Uuid);
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).ad

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170014726


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+subscribedTopics.add(topic2Uuid);
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));

Review Comment:
   nit: Is `new ArrayList<>` necessary here? There are many other cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170013892


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));

Review Comment:
   nit: Let's format such line as follow:
   
   ```
   new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.emptyList(),
Collections.emptyMap()
   );
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dimitarndimitrov commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-18 Thread via GitHub


dimitarndimitrov commented on code in PR #13432:
URL: https://github.com/apache/kafka/pull/13432#discussion_r1170012160


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class ListOffsetsHandler extends Batched {
+
+private final Map offsetTimestampsByPartition;
+private final ListOffsetsOptions options;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public ListOffsetsHandler(
+Map offsetTimestampsByPartition,
+ListOffsetsOptions options,
+LogContext logContext
+) {
+this.offsetTimestampsByPartition = offsetTimestampsByPartition;
+this.options = options;
+this.log = logContext.logger(ListOffsetsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "listOffsets";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+@Override
+ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map topicsByName = 
CollectionUtils.groupPartitionsByTopic(
+keys,
+topicName -> new ListOffsetsTopic().setName(topicName),
+(listOffsetsTopic, partitionId) -> {
+TopicPartition topicPartition = new 
TopicPartition(listOffsetsTopic.name(), partitionId);
+long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
+listOffsetsTopic.partitions().add(
+new ListOffsetsPartition()
+.setPartitionIndex(partitionId)
+.setTimestamp(offsetTimestamp));
+});
+boolean supportsMaxTimestamp = keys
+.stream()
+.anyMatch(key -> offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.MAX_TIMESTAMP);
+
+return ListOffsetsRequest.Builder
+.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp)
+.setTargetTimes(new ArrayList<>(topicsByName.values()));
+}
+
+@Override
+public ApiResult handleResponse(
+Node broker,
+Set keys,
+AbstractResponse abstractResponse
+) {
+ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse;
+Map completed = new HashMap<>();
+Map failed = new HashMap<>();
+List 

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170010343


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));

Review Comment:
   nit: Whenever possible, let's use `Collections.singletonMap`, 
`Collections.emptyMap`, `Collection.emptyList`, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170007516


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ *  the number of subscribed members is greater than the number of 
partitions for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ *This can only be done if every member is subscribed to the same topics 
and the topics are co-partitioned.
+ *Two streams are co-partitioned if the following conditions are met:
+ *
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ *
+ *  Members should retain as much as their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given member 
subscriptions.
+ *  Generate a list of members (potentiallyUnfilledMembers) 
that have not met the minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  Add members from the potentiallyUnfilled list to the 
Unfilled list if they haven't met the total required quota i.e. 
minimum number of partitions per member + 1 (if member is designated to receive 
one of the excess partitions) 
+ *  Generate a list of unassigned partitions by calculating the difference 
between total partitions and already assigned (sticky) partitions 
+ *  Iterate through unfilled members and assign partitions from the 
unassigned partitions 
+ * 
+ * 
+ *
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
+} else {
+log.info(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> get

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-18 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170005574


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ *  the number of subscribed members is greater than the number of 
partitions for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ *This can only be done if every member is subscribed to the same topics 
and the topics are co-partitioned.
+ *Two streams are co-partitioned if the following conditions are met:
+ *
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ *
+ *  Members should retain as much as their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given member 
subscriptions.
+ *  Generate a list of members (potentiallyUnfilledMembers) 
that have not met the minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  Add members from the potentiallyUnfilled list to the 
Unfilled list if they haven't met the total required quota i.e. 
minimum number of partitions per member + 1 (if member is designated to receive 
one of the excess partitions) 
+ *  Generate a list of unassigned partitions by calculating the difference 
between total partitions and already assigned (sticky) partitions 
+ *  Iterate through unfilled members and assign partitions from the 
unassigned partitions 
+ * 
+ * 
+ *
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
+} else {
+log.info(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> get

[GitHub] [kafka] jeffkbkim opened a new pull request, #13599: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, P…

2023-04-18 Thread via GitHub


jeffkbkim opened a new pull request, #13599:
URL: https://github.com/apache/kafka/pull/13599

   …art-1) (#13511)
   
   This patch implemented the first part of KIP-915. It updates the group 
coordinator and the transaction coordinator to ignores unknown record types 
while loading their respective state from the partitions. This allows 
downgrades from future versions that will include new record types.
   
   Reviewers: Alexandre Dupriez , David Jacot 

   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13598: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, P…

2023-04-18 Thread via GitHub


jeffkbkim opened a new pull request, #13598:
URL: https://github.com/apache/kafka/pull/13598

   …art-1) (#13511)
   
   This patch implemented the first part of KIP-915. It updates the group 
coordinator and the transaction coordinator to ignores unknown record types 
while loading their respective state from the partitions. This allows 
downgrades from future versions that will include new record types.
   
   Reviewers: Alexandre Dupriez , David Jacot 

   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >