[GitHub] [kafka] pprovenzano opened a new pull request, #13613: KAFKA-14859: SCRAM ZK to KRaft migration without dual write

2023-04-19 Thread via GitHub


pprovenzano opened a new pull request, #13613:
URL: https://github.com/apache/kafka/pull/13613

   Handle migrating SCRAM records in ZK when migrating from ZK to KRaft.
   
   This does not allow the user to change SCRAM records while migration is 
happening.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14807) MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the pause of replication of consumer groups

2023-04-19 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714097#comment-17714097
 ] 

Daniel Urban commented on KAFKA-14807:
--

[~fisher91] do you use MM2 dedicated mode, or use the Connectors directly in a 
Connect cluster?

If the latter, you can fix this by only passing 
source.consumer.auto.offset.reset=latest to MirrorSourceConnector, but not to 
the MirrorCheckpointConnector.

For MM2 dedicated mode, I'm not aware of any workarounds.

> MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the 
> pause of replication of consumer groups
> ---
>
> Key: KAFKA-14807
> URL: https://issues.apache.org/jira/browse/KAFKA-14807
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.0, 3.3.1, 3.3.2
> Environment: centos7
>Reporter: Zhaoli
>Priority: Major
>
> We use MirrorMaker2 to replicate messages and consumer group offsets from the 
> Kafka cluster `source` to cluster `target`.
> To reduce the load on the source cluster, we add this configuration to mm2 to 
> avoid replicating the whole history messages:
> {code:java}
> source.consumer.auto.offset.reset=latest {code}
> After that, we found part of the consumer group offsets had stopped 
> replicating.
> The common characteristic of these consumer groups is their EMPTY status, 
> which means they have no active members at that moment. All the active 
> consumer groups‘ offset replication work as normal.
> After researching the source code, we found this is because the configuration 
> above also affects the consumption of topic `mm2-offset-syncs`, therefore the 
> map `offsetSyncs` doesn't hold the whole topic partitions:
> {code:java}
> private final Map offsetSyncs = new HashMap<>(); 
> {code}
> And the lost topicPartitions lead to the pause of replication of the EMPTY 
> consumer groups, which is not expected.
> {code:java}
> OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
> upstreamOffset) {
> Optional offsetSync = latestOffsetSync(sourceTopicPartition);
> if (offsetSync.isPresent()) {
> if (offsetSync.get().upstreamOffset() > upstreamOffset) {
> // Offset is too far in the past to translate accurately
> return OptionalLong.of(-1L);
> }
> long upstreamStep = upstreamOffset - 
> offsetSync.get().upstreamOffset();
> return OptionalLong.of(offsetSync.get().downstreamOffset() + 
> upstreamStep);
> } else {
> return OptionalLong.empty();
> }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] urbandan commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-19 Thread via GitHub


urbandan commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1171304080


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   One idea to this:
   What if the checkpointing started using the source offset mechanism of 
Connect to keep track of the last offset-sync used for a specific consumer 
group?
   MirrorCheckpointTask could start emitting a source offset record like
   (group, topic, partition) -> (offsetOfLastUsedOffsetSync)
   
   This would allow
   1. The OffsetSyncStore to read compaction-eligible offset syncs at startup, 
since it will be able to detect that (going with the example from @gharris1727) 
it cannot use offset-sync B, as the last saved offset of the offset-sync topic 
was C (which has a higher offset than B).
   2. To rewind consumer group offsets if the source topic was recreated 
(offset sync record offsets will keep increasing regardless of the reset in the 
upstream offset)
   
   I understand that in complexity, this is pretty much the same as reading 
back checkpoints, but in terms of implementation, it should be simpler (?). One 
drawback is that the deletion of the offset-syncs topic messes up the offset 
translation logic - but I guess with KIP-875 (or with the followup of that KIP 
containing the "reset" functionality) that won't be a problem, as the 
checkpoint source offsets can be cleared to restore a clean state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-19 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1171245421


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int updatedFetchSize =

Review Comment:
   Good point. There is no risk here but it is good to be consistent with the 
local read pattern to return empty records for that case, will update with the 
changes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-19 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1171250580


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig,
 responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
   ): Unit = {
 // check if this fetch request can be satisfied right away
-val logReadResults = readFromLocalLog(params, fetchInfos, quota, 
readFromPurgatory = false)
+val logReadResults = readFromLog(params, fetchInfos, quota, 
readFromPurgatory = false)
 var bytesReadable: Long = 0
 var errorReadingData = false
+
+// The 1st topic-partition that has to be read from remote storage
+var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()

Review Comment:
   As I already called out in this PR description, that it is followed up with 
a PR. We will describe the config on different options with respective 
scenarios. The default value will be to fetch from multiple partitions as it 
does with local log segments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-19 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1171246205


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);

Review Comment:
   We will look into it in a followup PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-19 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1171245421


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int updatedFetchSize =

Review Comment:
   Good point. There is no risk here but it is good to be consistent with the 
local read pattern to return empty records for that case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-19 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1171242181


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+? firstBatch.sizeInBytes() : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatch.sizeInBytes();
+
+if (remainingBytes > 0) {
+// input stream is read till (startPos - 1) while getting the 
batch of records earlier.
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(new 
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+if (includeAbortedTxns) {
+fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), 
fetchDataInfo);
+}
+
+return fetchDataInfo;
+} finally {
+Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+}
+}
+
+private int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+}
+
+private FetchDataInfo addAbortedTransactions(long startOffset,
+ RemoteLogSegmentMetadata 
segmentMetadata,
+ FetchDataInfo fetchInfo) throws 
RemoteStorageException {
+int fetchSize = fetchInfo.records.sizeInBytes();
+OffsetPosition startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+.map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+final List abortedTransactions = 
new ArrayList<>();
+
+Consumer> accumulator =
+abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+
.map(AbortedTxn

[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-19 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1171240495


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1273,17 +1328,45 @@ class ReplicaManager(val config: KafkaConfig,
  _: FencedLeaderEpochException |
  _: ReplicaNotAvailableException |
  _: KafkaStorageException |
- _: OffsetOutOfRangeException |
  _: InconsistentTopicIdException) =>
-  LogReadResult(info = new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
-divergingEpoch = None,
-highWatermark = UnifiedLog.UnknownOffset,
-leaderLogStartOffset = UnifiedLog.UnknownOffset,
-leaderLogEndOffset = UnifiedLog.UnknownOffset,
-followerLogStartOffset = UnifiedLog.UnknownOffset,
-fetchTimeMs = -1L,
-lastStableOffset = None,
-exception = Some(e))
+  createLogReadResult(e)
+case e: OffsetOutOfRangeException =>

Review Comment:
   Yes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-19 Thread via GitHub


dengziming commented on code in PR #13432:
URL: https://github.com/apache/kafka/pull/13432#discussion_r1171232959


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class ListOffsetsHandler extends Batched {
+
+private final Map offsetTimestampsByPartition;
+private final ListOffsetsOptions options;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public ListOffsetsHandler(
+Map offsetTimestampsByPartition,
+ListOffsetsOptions options,
+LogContext logContext
+) {
+this.offsetTimestampsByPartition = offsetTimestampsByPartition;
+this.options = options;
+this.log = logContext.logger(ListOffsetsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "listOffsets";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+@Override
+ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map topicsByName = 
CollectionUtils.groupPartitionsByTopic(
+keys,
+topicName -> new ListOffsetsTopic().setName(topicName),
+(listOffsetsTopic, partitionId) -> {
+TopicPartition topicPartition = new 
TopicPartition(listOffsetsTopic.name(), partitionId);
+long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
+listOffsetsTopic.partitions().add(
+new ListOffsetsPartition()
+.setPartitionIndex(partitionId)
+.setTimestamp(offsetTimestamp));
+});
+boolean supportsMaxTimestamp = keys
+.stream()
+.anyMatch(key -> offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.MAX_TIMESTAMP);
+
+return ListOffsetsRequest.Builder
+.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp)
+.setTargetTimes(new ArrayList<>(topicsByName.values()));
+}
+
+@Override
+public ApiResult handleResponse(
+Node broker,
+Set keys,
+AbstractResponse abstractResponse
+) {
+ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse;
+Map completed = new 

[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-04-19 Thread via GitHub


Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with 
you that using the zero id does not feel right. It can be considered as an 
abusive use of the `TopicIdPartition` as an invalid reference to a resource in 
the cluster. In the worst case, an invalid `TopicIdPartition` could end up 
being used somewhere else which treats all `TopicIdPartition` as if they were 
valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of 
the `TopicIdPartition` are invalid (that is, references the zero id). This 
defeats the purpose of returning a map of offsets keyed by topic-id-partition. 
So, I 

[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-04-19 Thread via GitHub


Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with 
you that using the zero id does not feel right. It can be considered as an 
abusive use of the `TopicIdPartition` as an invalid reference to a resource in 
the cluster. In the worst case, an invalid `TopicIdPartition` could end up 
being used somewhere else which treats all `TopicIdPartition` as if they were 
valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of 
the `TopicIdPartition` are invalid (that is, references the zero id). This 
defeats the purpose of returning a map of offsets keyed by topic-id-partition. 
So, I 

[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-04-19 Thread via GitHub


Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with 
you that using the zero id does not feel right. It can be considered as an 
abusive use of the `TopicIdPartition` as an invalid reference to a resource in 
the cluster. In the worst case, an invalid `TopicIdPartition` could end up 
being used somewhere else which treats all `TopicIdPartition` as if they were 
valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of 
the `TopicIdPartition` are invalid (that is, references the zero id). This 
defeats the purpose of returning a value based on topic-id-partition. So, I 
agree wit

[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-04-19 Thread via GitHub


Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with 
you that using the zero id does not feel right. It can be considered as an 
abusive use of the `TopicIdPartition` as an invalid reference to a resource in 
the cluster. In the worst case, an invalid `TopicIdPartition` could end up 
being used somewhere else which treats all `TopicIdPartition` as if they were 
valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of 
the `TopicIdPartition` are invalid (that is, references the zero id). This 
defeats the purpose of returning a reference based on topic-id-partition. So, I 
agree

[GitHub] [kafka] vamossagar12 commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-19 Thread via GitHub


vamossagar12 commented on PR #13594:
URL: https://github.com/apache/kafka/pull/13594#issuecomment-1514537520

   @yashmayya , @C0urante can you also review this small PR whenever you get 
the chance? Thanlks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-19 Thread via GitHub


vamossagar12 commented on code in PR #13594:
URL: https://github.com/apache/kafka/pull/13594#discussion_r1171175052


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java:
##
@@ -216,7 +219,12 @@ public void putTopicStateRetriableFailure() {
 }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), 
any(Callback.class));
 
 store.put(topicStatus);
-verify(kafkaBasedLog, times(2)).send(any(), any(), any());
+try {
+verify(kafkaBasedLog, times(2)).send(any(), any(), any());

Review Comment:
   @urbandan , thanks that worked!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2023-04-19 Thread via GitHub


mdedetrich commented on PR #11478:
URL: https://github.com/apache/kafka/pull/11478#issuecomment-1514497721

   So one part of the CI seems to be failing, i.e. for `JDK 8 and Scala 2.12` 
(see 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11478/31/pipeline/11)
 but it appears to be completely unrelated 
   ```
   [2023-04-19T08:36:22.655Z] Unexpected exception thrown.
   
   [2023-04-19T08:36:22.655Z] 
org.gradle.internal.remote.internal.MessageIOException: Could not read message 
from '/127.0.0.1:51216'.
   ```
   
   Maybe someone can retry, looks like a Gradle bug and if I run the failing 
command locally, i.e. ` ./gradlew -PscalaVersion=2.12 
:streams:upgrade-system-tests-23:checkstyleTest` it passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14921) Avoid non numeric values for metrics

2023-04-19 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14921:
--

 Summary: Avoid non numeric values for metrics
 Key: KAFKA-14921
 URL: https://issues.apache.org/jira/browse/KAFKA-14921
 Project: Kafka
  Issue Type: Improvement
Reporter: Mickael Maison
Assignee: Mickael Maison


Many monitoring tools such as prometheus and graphite only support numeric 
values. This makes it hard to collect and monitor non numeric metrics.

We should avoid using Gauges with arbitrary types and provide numeric 
alternatives to such existing metrics.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-04-19 Thread via GitHub


Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with 
you that using the zero id does not feel right. It can be considered as an 
abusive use of the `TopicIdPartition` as a reference to a resource in the 
cluster. In the worst case, an invalid `TopicIdPartition` could end up being 
used somewhere else which treats all `TopicIdPartition` as if they were 
valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of 
the `TopicIdPartition` are invalid (that is, references the zero id). This 
defeats the purpose of returning a reference based on topic-id-partition. So, I 
agree with you

[GitHub] [kafka] machi1990 commented on pull request #13612: MINOR: fix noticed typo in raft and metadata projects

2023-04-19 Thread via GitHub


machi1990 commented on PR #13612:
URL: https://github.com/apache/kafka/pull/13612#issuecomment-1514430594

   Hi @showuon can you've a look at this PR as well when you've some time? It 
is a an extension of what I started doing yesterday in 
https://github.com/apache/kafka/pull/13593 as I am going through the codebase. 
Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 opened a new pull request, #13612: MINOR: fix noticed typo in raft and metadata projects

2023-04-19 Thread via GitHub


machi1990 opened a new pull request, #13612:
URL: https://github.com/apache/kafka/pull/13612

   I noticed a few typos in the `raft` and `metadata` projects and I thought I 
could open a quick minor PR to fix them.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-19 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1171070403


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.

Review Comment:
   just the html tags. the explanation is useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14709) Move content in connect/mirror/README.md to the docs

2023-04-19 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge reassigned KAFKA-14709:
-

Assignee: Gantigmaa Selenge

> Move content in connect/mirror/README.md to the docs
> 
>
> Key: KAFKA-14709
> URL: https://issues.apache.org/jira/browse/KAFKA-14709
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, mirrormaker
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
>
> We should move all the content in 
> https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md to the 
> relevant doc sections. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] machi1990 commented on pull request #13611: MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method

2023-04-19 Thread via GitHub


machi1990 commented on PR #13611:
URL: https://github.com/apache/kafka/pull/13611#issuecomment-1514361633

   Hi @cmccabe @jsancio can you've a look at this, thanks?
   
   /cc @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14084) Support SCRAM when using KRaft mode

2023-04-19 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713979#comment-17713979
 ] 

Mickael Maison commented on KAFKA-14084:


[~cmccabe]/[~pprovenzano] Do you have an update on 
https://issues.apache.org/jira/browse/KAFKA-14859 ? I don't see a PR for it 
yet. Thanks

> Support SCRAM when using KRaft mode
> ---
>
> Key: KAFKA-14084
> URL: https://issues.apache.org/jira/browse/KAFKA-14084
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>
> Support SCRAM when using KRaft mode, as specified in KIP-631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] machi1990 opened a new pull request, #13611: MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method

2023-04-19 Thread via GitHub


machi1990 opened a new pull request, #13611:
URL: https://github.com/apache/kafka/pull/13611

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

2023-04-19 Thread via GitHub


mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r1170988090


##
docs/upgrade.html:
##
@@ -26,6 +26,8 @@ Notable changes in 3
 trying to create an already existing metric. (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-843%3A+Adding+addMetricIfAbsent+method+to+Metrics";>KIP-843
 for more details).
 
+Apache Kafka now supports having both an IPv4 and an IPv6 listener 
on the same port. This change only applies to
+non advertised listeners (advertised listeners already have this 
feature)

Review Comment:
   Done in the `Move upgrade notes to 3.6.0 section` commit. I had to add a new 
section for `3.6.0` since it didn't exist (this also involved rebasing onto 
latest trunk)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-19 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1170981380


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+subscribedTopics.add(topic2Uuid);
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).ad

[GitHub] [kafka] dajac commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-04-19 Thread via GitHub


dajac commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1170980408


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   It could if the IBP is kept on an old version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14586) Move StreamsResetter to tools

2023-04-19 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713966#comment-17713966
 ] 

Federico Valeri commented on KAFKA-14586:
-

Thanks [~sagarrao]. When the redirection is merged, please also add it to the 
removal list for 4.0 release in KAFKA-14705.

> Move StreamsResetter to tools
> -
>
> Key: KAFKA-14586
> URL: https://issues.apache.org/jira/browse/KAFKA-14586
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14705) Remove tools redirections and deprecations

2023-04-19 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-14705:

Summary: Remove tools redirections and deprecations  (was: Remove all tools 
redirections and deprecations)

> Remove tools redirections and deprecations
> --
>
> Key: KAFKA-14705
> URL: https://issues.apache.org/jira/browse/KAFKA-14705
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Federico Valeri
>Priority: Major
> Fix For: 4.0.0
>
>
> Redirections:
> - core/src/main/scala/kafka/tools/JmxTool
> - core/src/main/scala/kafka/tools/ClusterTool
> - core/src/main/scala/kafka/tools/StateChangeLogMerger
> - core/src/main/scala/kafka/tools/EndToEndLatency
> - core/src/main/scala/kafka/admin/FeatureCommand
> Deprecations:
> - tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #13459: KAFKA-14592: Move FeatureCommand to tools

2023-04-19 Thread via GitHub


showuon commented on code in PR #13459:
URL: https://github.com/apache/kafka/pull/13459#discussion_r1170939326


##
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java:
##
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+
+import static 
org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.SAFE_DOWNGRADE;
+import static 
org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
+public class FeatureCommandTest {
+
+private final ClusterInstance cluster;
+public FeatureCommandTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testDescribeWithZK() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), 
"describe"))
+);
+assertEquals("", commandOutput);
+}
+
+@ClusterTest(clusterType = Type.KRAFT, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testDescribeWithKRaft() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), 
"describe"))
+);
+assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.0-IV1\t" +
+"SupportedMaxVersion: 3.5-IV1\tFinalizedVersionLevel: 
3.3-IV1\t", outputWithoutEpoch(commandOutput));
+}
+
+@ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testUpgradeMetadataVersionWithZk() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
+"upgrade", "--metadata", "3.3-IV2"))
+);
+assertEquals("Could not upgrade metadata.version to 6. Could not apply 
finalized feature " +
+"update because the provided feature is not supported.", 
commandOutput);
+}
+
+@ClusterTest(clusterType = Type.KRAFT, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testUpgradeMetadataVersionWithKraft() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
+"upgrade", "--feature", "metadata.version=5"))
+);
+assertEquals("metadata.version was upgraded to 5.", commandOutput);
+
+commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
+"upgrade", "--metadata", "3.3-IV2"))
+);
+assertEquals("metadata.version was upgraded to 6.", commandOutput);
+}
+
+@ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataV

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-19 Thread via GitHub


vamossagar12 commented on code in PR #13594:
URL: https://github.com/apache/kafka/pull/13594#discussion_r1170934924


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java:
##
@@ -216,7 +219,12 @@ public void putTopicStateRetriableFailure() {
 }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), 
any(Callback.class));
 
 store.put(topicStatus);
-verify(kafkaBasedLog, times(2)).send(any(), any(), any());
+try {
+verify(kafkaBasedLog, times(2)).send(any(), any(), any());

Review Comment:
   @urbandan , that's what I am suspecting. I wanted to verify if this was the 
case with this hackish fix ( and also was not aware of timeout option). Let me 
set a timeout of 300ms and see if that helps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-19 Thread via GitHub


vamossagar12 commented on code in PR #13594:
URL: https://github.com/apache/kafka/pull/13594#discussion_r1170934924


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java:
##
@@ -216,7 +219,12 @@ public void putTopicStateRetriableFailure() {
 }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), 
any(Callback.class));
 
 store.put(topicStatus);
-verify(kafkaBasedLog, times(2)).send(any(), any(), any());
+try {
+verify(kafkaBasedLog, times(2)).send(any(), any(), any());

Review Comment:
   @urbandan , that's what I am suspecting. Ok I wanted to verify if this was 
the case with this hackish fix ( and also was not aware of timeout option). Let 
me set a timeout of 300ms and see if that helps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dimitarndimitrov commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-19 Thread via GitHub


dimitarndimitrov commented on code in PR #13432:
URL: https://github.com/apache/kafka/pull/13432#discussion_r1170897060


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java:
##
@@ -70,6 +71,23 @@
  */
 ApiResult handleResponse(Node broker, Set keys, AbstractResponse 
response);
 
+/**
+ * Callback that is invoked when a fulfillment request hits an 
UnsupportedVersionException.
+ * Keys for which the exception cannot be handled and the request 
shouldn't be retried must be mapped
+ * to an error and returned. The request will then be retried for the 
remainder of the keys.
+ *
+ * @return The failure mappings for the keys for which the exception 
cannot be handled and the
+ * request shouldn't be retried. If the exception cannot be handled all 
initial keys will be in
+ * the returned map.
+ */
+default Map handleUnsupportedVersionException(
+int brokerId,

Review Comment:
   That's what I first looked into but I didn't find a nice way to achieve it 
unless I also add a `Node` param in `onFailure` and pass down the current node 
from the `Call` instance in `KafkaAdminClient.newCall`. That however would 
require a number of changes related to the new `onFailure` signature so I am 
not too fond of the idea.
   Let me know if you have anything better in mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13459: KAFKA-14592: Move FeatureCommand to tools

2023-04-19 Thread via GitHub


showuon commented on code in PR #13459:
URL: https://github.com/apache/kafka/pull/13459#discussion_r1170888493


##
tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java:
##
@@ -193,8 +181,7 @@ static String levelToString(String feature, short level) {
 
 static void handleDescribe(Admin adminClient) throws ExecutionException, 
InterruptedException {
 FeatureMetadata featureMetadata = 
adminClient.describeFeatures().featureMetadata().get();
-TreeSet featureList = new 
java.util.TreeSet<>(featureMetadata.supportedFeatures().keySet());
-featureList.forEach(feature -> {
+featureMetadata.supportedFeatures().keySet().forEach(feature -> {

Review Comment:
   If we remove `TreeSet`, we should sort it by ourselves. Maybe 
`featureMetadata.supportedFeatures().keySet().stream().sorted().forEach...` ?



##
tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java:
##
@@ -307,25 +294,24 @@ private static void update(String op, Admin admin, 
Map up
 }
 });
 
-AtomicInteger numFailures = new AtomicInteger();
-errors.keySet().forEach(feature -> {
-short level = updates.get(feature).maxVersionLevel();
-Optional maybeThrowable = errors.get(feature);
+int numFailures = 0;
+for (Map.Entry> entry : errors.entrySet()) 
{

Review Comment:
   nit: It seems we only need keys, so maybe `for (String feature : 
errors.keySet())` ?



##
core/src/main/scala/kafka/admin/FeatureCommand.scala:
##
@@ -17,306 +17,12 @@
 
 package kafka.admin
 
-import kafka.tools.TerseFailure
-import kafka.utils.Exit
-import net.sourceforge.argparse4j.ArgumentParsers
-import net.sourceforge.argparse4j.impl.Arguments.{append, fileType, store, 
storeTrue}
-import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, 
Subparsers}
-import net.sourceforge.argparse4j.internal.HelpScreenException
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
-import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, 
UpdateFeaturesOptions}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.MetadataVersion
-
-import java.io.{File, PrintStream}
-import java.util.Properties
-import scala.concurrent.ExecutionException
-import scala.jdk.CollectionConverters._
-import scala.compat.java8.OptionConverters._
-
+@deprecated(since = "3.5")
 object FeatureCommand {
   def main(args: Array[String]): Unit = {
-val res = mainNoExit(args, System.out)
-Exit.exit(res)
-  }
-
-  // This is used for integration tests in order to avoid killing the test 
with Exit.exit,
-  // and in order to capture the command output.
-  def mainNoExit(
-args: Array[String],
-out: PrintStream
-  ): Int = {
-val parser = ArgumentParsers.newArgumentParser("kafka-features")
-  .defaultHelp(true)
-  .description("This tool manages feature flags in Kafka.")
-parser.addArgument("--bootstrap-server")
-  .help("A comma-separated list of host:port pairs to use for establishing 
the connection to the Kafka cluster.")
-  .required(true)
-
-parser.addArgument("--command-config")
-  .`type`(fileType())
-  .help("Property file containing configs to be passed to Admin Client.")
-val subparsers = parser.addSubparsers().dest("command")
-addDescribeParser(subparsers)
-addUpgradeParser(subparsers)
-addDowngradeParser(subparsers)
-addDisableParser(subparsers)
-
-try {
-  val namespace = parser.parseArgs(args)
-  val command = namespace.getString("command")
-
-  val commandConfig = namespace.get[File]("command_config")
-  val props = if (commandConfig != null) {
-if (!commandConfig.exists()) {
-  throw new TerseFailure(s"Properties file ${commandConfig.getPath} 
does not exists!")
-}
-Utils.loadProps(commandConfig.getPath)
-  } else {
-new Properties()
-  }
-  props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
namespace.getString("bootstrap_server"))
-  val admin = Admin.create(props)
-  try {
-command match {
-  case "describe" => handleDescribe(out, admin)
-  case "upgrade" => handleUpgrade(out, namespace, admin)
-  case "downgrade" => handleDowngrade(out, namespace, admin)
-  case "disable" => handleDisable(out, namespace, admin)
-}
-  } finally {
-admin.close()
-  }
-  0
-} catch {
-  case _: HelpScreenException =>
-0

Review Comment:
   Why don't we need `HelpScreenException` now?



##
tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java:
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1170929319


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java:
##
@@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() {
 assertEquals(count.get(), 2);
 }
 
+@Test
+public void shouldSetUseVersionedSemanticsOnTableFilter() {
+// Given:
+final MaterializedInternal> materializedInternal =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
materializedInternal);
+table1.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, true);
+}
+
+@Test
+public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = table1.mapValues(v -> v != null 
? v + v : null);
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, true);
+}
+
+@Test
+public void 
shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final MaterializedInternal> unversionedMaterialize =
+new MaterializedInternal<>(Materialized.as("unversioned"), 
builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = table1.mapValues(v -> v != null 
? v + v : null, unversionedMaterialize);
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, false);
+}
+
+@Test
+public void 
shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final MaterializedInternal> versionedMaterialize2 =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = table1.mapValues(v -> v != null 
? v + v : null, versionedMaterialize2);
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, true);
+}
+
+@Test
+public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() 
{
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = 
table1.groupBy(KeyValue::new).count();
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, false);
+}
+
+@Test
+public void 
shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned()
 {
+  

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1170924158


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1294,4 +1307,11 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 builder
 );
 }
+
+private static void maybeSetOutputVersioned(final GraphNode tableNode,

Review Comment:
   Hm, this class is method used only for table methods for which it's valid 
for `materializedInternal` to be null, e.g., table filter, mapValues, etc, 
where the result is not necessarily materialized. I don't think there's a way 
to rewrite this code such that `materializedInternal` will never be null in 
these cases, because non-null `materializedInternal` means that the result is 
materialized, when that's not necessarily the case? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-19 Thread via GitHub


urbandan commented on code in PR #13594:
URL: https://github.com/apache/kafka/pull/13594#discussion_r1170921037


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java:
##
@@ -216,7 +219,12 @@ public void putTopicStateRetriableFailure() {
 }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), 
any(Callback.class));
 
 store.put(topicStatus);
-verify(kafkaBasedLog, times(2)).send(any(), any(), any());
+try {
+verify(kafkaBasedLog, times(2)).send(any(), any(), any());

Review Comment:
   Did I introduce flakiness with KAFKA-14902?
   If yes, I think this verify should be enhanced with a timeout 
(times(2).timeout(...)) to allow the background thread to trigger the retry, 
then you don't need to mock the executor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dimitarndimitrov commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-19 Thread via GitHub


dimitarndimitrov commented on code in PR #13432:
URL: https://github.com/apache/kafka/pull/13432#discussion_r1170905273


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class ListOffsetsHandler extends Batched {
+
+private final Map offsetTimestampsByPartition;
+private final ListOffsetsOptions options;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public ListOffsetsHandler(
+Map offsetTimestampsByPartition,
+ListOffsetsOptions options,
+LogContext logContext
+) {
+this.offsetTimestampsByPartition = offsetTimestampsByPartition;
+this.options = options;
+this.log = logContext.logger(ListOffsetsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "listOffsets";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+@Override
+ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map topicsByName = 
CollectionUtils.groupPartitionsByTopic(
+keys,
+topicName -> new ListOffsetsTopic().setName(topicName),
+(listOffsetsTopic, partitionId) -> {
+TopicPartition topicPartition = new 
TopicPartition(listOffsetsTopic.name(), partitionId);
+long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
+listOffsetsTopic.partitions().add(
+new ListOffsetsPartition()
+.setPartitionIndex(partitionId)
+.setTimestamp(offsetTimestamp));
+});
+boolean supportsMaxTimestamp = keys
+.stream()
+.anyMatch(key -> offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.MAX_TIMESTAMP);
+
+return ListOffsetsRequest.Builder
+.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp)
+.setTargetTimes(new ArrayList<>(topicsByName.values()));
+}
+
+@Override
+public ApiResult handleResponse(
+Node broker,
+Set keys,
+AbstractResponse abstractResponse
+) {
+ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse;
+Map completed = new HashMap<>();
+Map failed = new HashMap<>();
+List 

[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

2023-04-19 Thread via GitHub


LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1170903894


##
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
 }
 return Double.longBitsToDouble(value);
 }
+
+@Override
+public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+if (data == null) {
+return null;
+}
+
+if (data.remaining() != 8) {
+throw new SerializationException("Size of data received by 
DoubleDeserializer is not 8");
+}
+
+final ByteOrder srcOrder = data.order();
+data.order(BIG_ENDIAN);
+
+final double value = data.getDouble(data.position());

Review Comment:
   > Also, the `data.position()` could be removed because by default, it'll 
read from current position of the byte buffer.
   
   @showuon 
   I use ByteBuffer.getDouble(int) instead of ByteBuffer.getDouble(), 
considering that ByteBuffer.getDouble(int) does not modify the offsets of the 
ByteBuffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1163422597


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java:
##
@@ -25,5 +25,21 @@
 
 ValueAndTimestamp get(K key);
 
+/**
+ * Returns the latest record version, associated with the provided key, 
with timestamp
+ * not exceeding the provided timestamp bound. This method may only be 
called if
+ * {@link #isVersioned()} is true.
+ */
+default ValueAndTimestamp get(K key, long asOfTimestamp) {
+throw new UnsupportedOperationException("get(key, timestamp) is only 
supported for versioned stores");
+}
+
+/**
+ * @return whether this value getter supports multiple record versions for 
the same key.
+ * If true, then {@link #get(Object, long)} must be implemented. 
If not, then
+ * {@link #get(Object, long)} must not be called.
+ */
+boolean isVersioned();

Review Comment:
   Heh, I made one of them non-default in order to force future authors of new 
value getters to have to consider at least one of them, with the hope that 
considering one implies considering the other as well :) I will leave it as-is 
for now, can always be changed in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170899662


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1098,7 +1101,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 //not be done needlessly.
 ((KTableImpl) foreignKeyTable).enableSendingOldValues(true);
 
-//Old values must be sent such that the 
ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the 
correct node.
+//Old values must be sent such that the 
SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   Sure thing. Here ya go: https://github.com/apache/kafka/pull/13610



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1098,7 +1101,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 //not be done needlessly.
 ((KTableImpl) foreignKeyTable).enableSendingOldValues(true);
 
-//Old values must be sent such that the 
ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the 
correct node.
+//Old values must be sent such that the 
SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   Sure thing: https://github.com/apache/kafka/pull/13610



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170899662


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1098,7 +1101,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 //not be done needlessly.
 ((KTableImpl) foreignKeyTable).enableSendingOldValues(true);
 
-//Old values must be sent such that the 
ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the 
correct node.
+//Old values must be sent such that the 
SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   Here you go: https://github.com/apache/kafka/pull/13610



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13610: MINOR: update comment for FK join processor renames

2023-04-19 Thread via GitHub


vcrfxia opened a new pull request, #13610:
URL: https://github.com/apache/kafka/pull/13610

   Minor follow-up to https://github.com/apache/kafka/pull/13589. This PR fixes 
a few comments where the old class names are still being used, to use the new 
class names instead.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] LinShunKang commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy

2023-04-19 Thread via GitHub


LinShunKang commented on code in PR #12545:
URL: https://github.com/apache/kafka/pull/12545#discussion_r1170898685


##
clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java:
##
@@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) {
 }
 return Double.longBitsToDouble(value);
 }
+
+@Override
+public Double deserialize(String topic, Headers headers, ByteBuffer data) {
+if (data == null) {
+return null;
+}
+
+if (data.remaining() != 8) {
+throw new SerializationException("Size of data received by 
DoubleDeserializer is not 8");
+}
+
+final ByteOrder srcOrder = data.order();
+data.order(BIG_ENDIAN);
+
+final double value = data.getDouble(data.position());

Review Comment:
   @showuon 
   Because DoubleDeserializer and other Number Deserializers use BIG_ENDIAN 
byte order to read from byte[], and the current byte order of ByteBuffer may 
not be BIG_ENDIAN, we set the byte order of ByteBuffer to BIG_ENDIAN to be 
consistent with the byte order used when reading from byte[].



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dimitarndimitrov commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver

2023-04-19 Thread via GitHub


dimitarndimitrov commented on code in PR #13432:
URL: https://github.com/apache/kafka/pull/13432#discussion_r1170897060


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java:
##
@@ -70,6 +71,23 @@
  */
 ApiResult handleResponse(Node broker, Set keys, AbstractResponse 
response);
 
+/**
+ * Callback that is invoked when a fulfillment request hits an 
UnsupportedVersionException.
+ * Keys for which the exception cannot be handled and the request 
shouldn't be retried must be mapped
+ * to an error and returned. The request will then be retried for the 
remainder of the keys.
+ *
+ * @return The failure mappings for the keys for which the exception 
cannot be handled and the
+ * request shouldn't be retried. If the exception cannot be handled all 
initial keys will be in
+ * the returned map.
+ */
+default Map handleUnsupportedVersionException(
+int brokerId,

Review Comment:
   That's what I first looked into but I didn't find a nice way to achieve it 
unless I also add a `Node` param in `onFailure` and pass down the current node 
from the `Call` instance in `KafkaAdminClient.newCall`. That however would 
require a number of changes related to the new `onFailure` signature so I am 
not to fond of the idea.
   Let me know if you have anything better in mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170894940


##
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java:
##
@@ -446,14 +482,18 @@ public void testInnerWithRightVersionedOnly() throws 
Exception {
 null,
 null,
 null,
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"D-d", null,  15L)),
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"E-d", null,  14L)),
 null,
 null,
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"F-d", null,  14L))
+null,
+null,
+Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"E-e", null,  15L)),
+Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
null, null,  14L)),
+null,
+null,
+Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"F-e", null,  14L))

Review Comment:
   Yes, you are correct. I've fixed this and also strengthened the check that 
the test uses (in the latest commit) so that this type of "error" fails the 
test now. This required updating some of the table-table multi-join expected 
test results in the process, separate from versioned joins.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170894062


##
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java:
##
@@ -215,14 +226,19 @@ public void testInnerWithVersionedStores() {
 null,
 null,
 null,
-Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", 
null,  15L)),
+null,
+null,
+null,
+null,
+null,

Review Comment:
   Yep, good eye. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2