[GitHub] [kafka] showuon commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-05-07 Thread via GitHub


showuon commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1185891570


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the 
following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be 
configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because 
it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer 
pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an 
intermediate buffer, this uses a buffer supplier to
+ * create the intermediate buffer.
+ * 
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where 
multiple threads access this.
+ * - the implementation of this class is performance sensitive. Minor changes 
such as usage of ByteBuffer instead of byte[]
+ * can significantly impact performance, hence, proceed with caution.
+ */
+public class ChunkedBytesStream extends FilterInputStream {
+/**
+ * Supplies the ByteBuffer which is used as intermediate buffer to store 
the chunk of output data.
+ */
+private final BufferSupplier bufferSupplier;
+/**
+ * Intermediate buffer to store the chunk of output data. The 
ChunkedBytesStream is considered closed if
+ * this buffer is null.
+ */
+private byte[] intermediateBuf;
+/**
+ * The index one greater than the index of the last valid byte in
+ * the buffer.
+ * This value is always in the range 0 through 
intermediateBuf.length;
+ * elements intermediateBuf[0]  through 
intermediateBuf[count-1]
+ * contain buffered input data obtained
+ * from the underlying  input stream.
+ */
+protected int count = 0;
+/**
+ * The current position in the buffer. This is the index of the next
+ * character to be read from the buf array.
+ * 
+ * This value is always in the range 0
+ * through count. If it is less
+ * than count, then  intermediateBuf[pos]
+ * is the next byte to be supplied as input;
+ * if it is equal to count, then
+ * the  next read or skip
+ * operation will require more bytes to be
+ * read from the contained  input stream.
+ */
+protected int pos = 0;
+/**
+ * Reference for the intermediate buffer. This reference is only kept for 
releasing the buffer from the
+ * buffer supplier.
+ */
+private final ByteBuffer intermediateBufRef;
+/**
+ * Determines if the skip be pushed down
+ */
+private final boolean pushSkipToSourceStream;
+
+public ChunkedBytesStream(InputStream in, BufferSupplier bufferSupplier, 
int intermediateBufSize, boolean pushSkipToSourceStream) {
+super(in);
+this.bufferSupplier = bufferSupplier;
+intermediateBufRef = bufferSupplier.get(intermediateBufSize);
+if (!intermediateBufRef.hasArray() || 
(intermediateBufRef.arrayOffset() != 0)) {
+throw new IllegalArgumentException("provided ByteBuffer lacks 
array or has non-zero arrayOffset");
+}
+intermediateBuf = intermediateBufRef.array();
+this.pushSkipToSourceStream = pushSkipToSourceStream;
+}
+
+/**
+ * Check to make sure that buffer has not been nulled out due to
+ * close; if not return it;
+ */
+private byte[] getBufIfOpen() throws IOException {
+byte[] buffer = intermediateBuf;
+if (buffer == null)
+throw new IOException("Stream closed");
+return buffer;
+}
+
+/**
+ * See

Review Comment:
   See ?



##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java:
##
@@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
 

[GitHub] [kafka] ableegoldman opened a new pull request, #13682: MINOR: improve error messages for join window store settings

2023-05-07 Thread via GitHub


ableegoldman opened a new pull request, #13682:
URL: https://github.com/apache/kafka/pull/13682

   Trying to configure the underlying state stores in a join is pretty awkward, 
and getting all the parameters right can be particularly frustrating. 
   
   At a minimum, we should break up the verification into each individual check 
so that it's clear which step has failed. It would also help to actually 
include in the error message how to fix the store parameters to match what is 
expected, since this is not necessarily obvious (eg that retention period 
should be **exactly** the sum of windowSize + gracePeriod)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14914) binarySearch in AbstactIndex may execute with infinite loop

2023-05-07 Thread li xiangyuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720372#comment-17720372
 ] 

li xiangyuan commented on KAFKA-14914:
--

After create this jira ticket we had encountered this problem again, we try
to fetch the index file but failed, for now we downgrade our aws ec2
instance to c6 and haven't met this. we are keeping tracking this.

Luke Chen (Jira)  于2023年5月5日周五 09:46写道:



> binarySearch in AbstactIndex may execute with infinite loop
> ---
>
> Key: KAFKA-14914
> URL: https://issues.apache.org/jira/browse/KAFKA-14914
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0
>Reporter: li xiangyuan
>Priority: Major
> Attachments: stack.1.txt, stack.2.txt, stack.3.txt
>
>
> Recently our servers in production environment may suddenly stop handle 
> request frequently(for now 3 times in less than 10 days),   please check the 
> stack file uploaded, it show that 1 
> ioThread(data-plane-kafka-request-handler-11) hold  the ReadLock of 
> Partition's leaderIsrUpdateLock and keep run the binarySearch function, once 
> any thread(kafka-scheduler-2) need WriteMode Of this lock, then all requests 
> read this partition need ReadMode Lock will use out all ioThreads and then 
> this broker couldn't handle any request.
> the 3 stack files are fetched with interval  about 6 minute, with my 
> standpoint i just could think obviously the  binarySearch function cause dead 
> lock and I presuppose maybe the index block values in offsetIndex (at least 
> in mmap) are not sorted.
>  
> detail information:
> this problem appear in 2 brokers
> broker version: 2.4.0
> jvm: openjdk 11
> hardware: aws c7g 4xlarge, this is a arm64 server, we recently upgrade our 
> servers from c6g 4xlarge to this type, when we use c6g haven't meet this 
> problem, we don't know whether arm or aws c7g server have any problem.
> other: once we restart broker, it will recover, so we doubt offset index file 
> may not corrupted and maybe something wrong with mmap.
> plz give any suggestion solve this problem, thx.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-07 Thread via GitHub


showuon merged PR #13515:
URL: https://github.com/apache/kafka/pull/13515


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-07 Thread via GitHub


showuon commented on PR #13515:
URL: https://github.com/apache/kafka/pull/13515#issuecomment-1537643776

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 8 and Scala 2.12 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsTest.testFailureToFenceEpoch(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testDelayedConfigurationOperations()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13669: MINOR: Fix producer Callback comment

2023-05-07 Thread via GitHub


showuon commented on PR #13669:
URL: https://github.com/apache/kafka/pull/13669#issuecomment-1537641986

   CI build failed with infra issue. Rebuilding
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13669/3/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey opened a new pull request, #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

2023-05-07 Thread via GitHub


mehbey opened a new pull request, #13681:
URL: https://github.com/apache/kafka/pull/13681

   This pull requests migrates the ActiveTaskCreator mock in TaskManagerTest 
from EasyMock to Mockito
   The change is restricted to a single mock to minimize the scope and make it 
easier for review.
   Please see two examples that follows the same pattern below:
   https://github.com/apache/kafka/pull/13529
   https://github.com/apache/kafka/pull/13621
   
   
   ### Summary of testing strategy (including rationale)
   
   Verified that all tests are passing
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-07 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1186813113


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +622,208 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadataOptional = 
epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadataOptional.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+// An empty record is sent instead of an incomplete batch when 
there is no minimum-one-message constraint
+// and for FetchRequest version 3 and above and the first batch 
size is more than maximum bytes that can be sent.
+int firstBatchSize = firstBatch.sizeInBytes();
+if (!remoteStorageFetchInfo.minOneMessage &&
+!remoteStorageFetchInfo.hardMaxBytesLimit &&
+firstBatchSize > maxBytes) {
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
+}
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatchSize;
+
+if (remainingBytes > 0) {
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(
+new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+MemoryRecords.readableRecords(buffer));
+if (includeAbortedTxns) {
+   

[jira] [Created] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-05-07 Thread Erik van Oosten (Jira)
Erik van Oosten created KAFKA-14972:
---

 Summary: Make KafkaConsumer usable in async runtimes
 Key: KAFKA-14972
 URL: https://issues.apache.org/jira/browse/KAFKA-14972
 Project: Kafka
  Issue Type: Wish
  Components: consumer
Reporter: Erik van Oosten


KafkaConsumer contains a check that rejects nested invocations from different 
threads (method {{{}acquire{}}}). For users that use an async runtime, this is 
an almost impossible requirement. Examples of async runtimes that are affected 
are Kotlin co-routines (see KAFKA-7143) and Zio.

We propose to replace the thread-id check with an access-id that is stored on a 
thread-local variable. Existing programs will not be affected. Developers that 
work in an async runtime can pick up the access-id and set it on the 
thread-local variable in a thread of their choosing.

Every time a callback is invoked a new access-id is generated. When the 
callback completes, the previous access-id is restored.

This proposal does not make it impossible to use the client incorrectly. 
However, we think it strikes a good balance between making correct usage from 
an async runtime possible while making incorrect usage difficult.

Alternatives considered:
 # Configuration that switches off the check completely.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-07 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1186823970


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The epoch of the member when the state was last updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:

Review Comment:
   We do but it is a transient state so it is not part of the member's states. 
We transition to it 
[here](https://github.com/apache/kafka/pull/13638/files/0395a1780acd73e735bfee21da733075db85504d#diff-6c5e23803064f4b7a122eee29736d036b9cfe244e69f48751ba163d62e2bf35fR176).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-07 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1186813113


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +622,208 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadataOptional = 
epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadataOptional.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+// An empty record is sent instead of an incomplete batch when 
there is no minimum-one-message constraint
+// and for FetchRequest version 3 and above and the first batch 
size is more than maximum bytes that can be sent.
+int firstBatchSize = firstBatch.sizeInBytes();
+if (!remoteStorageFetchInfo.minOneMessage &&
+!remoteStorageFetchInfo.hardMaxBytesLimit &&
+firstBatchSize > maxBytes) {
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
+}
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatchSize;
+
+if (remainingBytes > 0) {
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(
+new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+MemoryRecords.readableRecords(buffer));
+if (includeAbortedTxns) {
+   

[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-07 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1186812350


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+? firstBatch.sizeInBytes() : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatch.sizeInBytes();
+
+if (remainingBytes > 0) {
+// input stream is read till (startPos - 1) while getting the 
batch of records earlier.
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(new 
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+if (includeAbortedTxns) {
+fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), 
fetchDataInfo);
+}
+
+return fetchDataInfo;
+} finally {
+Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+}
+}
+
+private int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+}
+
+private FetchDataInfo addAbortedTransactions(long startOffset,
+ RemoteLogSegmentMetadata 
segmentMetadata,
+ FetchDataInfo fetchInfo) throws 
RemoteStorageException {
+int fetchSize = fetchInfo.records.sizeInBytes();
+OffsetPosition startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+.map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+final List abortedTransactions = 
new ArrayList<>();
+
+Consumer> accumulator =
+abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+

[jira] [Updated] (KAFKA-14930) Public documentation for new Kafka Connect offset management REST APIs

2023-05-07 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14930:
---
Description: 
Add public documentation for the new Kafka Connect offset management REST APIs 
from 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 being introduced in 3.6:
 * *PATCH* /connectors/\{connector}/offsets
 * *DELETE* /connectors/\{connector}/offsets

  was:
Add public documentation for the 3 new Kafka Connect offset management REST 
APIs being introduced in 
[KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 * *PATCH* /connectors/\{connector}/offsets
 * *DELETE* /connectors/\{connector}/offsets


> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14930
> URL: https://issues.apache.org/jira/browse/KAFKA-14930
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
>
> Add public documentation for the new Kafka Connect offset management REST 
> APIs from 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  being introduced in 3.6:
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-05-07 Thread via GitHub


vamossagar12 commented on PR #13594:
URL: https://github.com/apache/kafka/pull/13594#issuecomment-1537335506

   The 2 connect related tests, `testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest` 
and `testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted – 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest` seem to be 
flaky and I have created separate JIRA tickets 
[here](https://issues.apache.org/jira/browse/KAFKA-14971) and 
[here](https://issues.apache.org/jira/browse/KAFKA-14956) to track the failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs

2023-05-07 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14971:
--
Description: 
The test testSyncTopicConfigs in 
`org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs`
 seems to be flaky. Found here : 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests]

Ran on local against the [same PR  
|https://github.com/apache/kafka/pull/13594]and  it passed.

 

 
{code:java}
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>

Stacktrace
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153)
at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758)
at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325)
at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373)
at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322)
at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306)
at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296)
at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752)
at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 

[jira] [Created] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs

2023-05-07 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14971:
-

 Summary: Flaky Test 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
 Key: KAFKA-14971
 URL: https://issues.apache.org/jira/browse/KAFKA-14971
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao


The test testSyncTopicConfigs in ` 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs`
 seems to be flaky. Found here : 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests]

 

Ran on local against the [same PR  
|https://github.com/apache/kafka/pull/13594]and  it has passed.

 

```
h4. Error
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>
h4. Stacktrace
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
 at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
 at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
 at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153)
 at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758)
 at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325)
 at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296)
 at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
 at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
 at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
 at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
 at