Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-28 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2026690869

   @lucasbru - Thanks for taking time reviewing this PR.  This PR is ready for 
another pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-28 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1544067091


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -946,7 +956,7 @@ public void testOffsetsForTimesWithZeroTimeout() {
 @Test
 public void testWakeupCommitted() {
 consumer = newConsumer();
-final HashMap offsets = 
mockTopicPartitionOffset();

Review Comment:
   just cleaning up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2026641230

   > (1) revert all offsetForMaxTimestamp to shallowOffsetMaxTimestamp
   (2) change/revert the implementation to set shallowOffsetMaxTimestamp 
accordingly.
   
   Do we need to revert all of them? the paths we had fixed works well now. 
   1) It seems to me adding comments for both "recover" and "follower" cases 
can remind readers that this `offsetOfMaxTimestampMs` is shallow.
   2) or we can only rename `offsetForMaxTimestamp` back to 
`shallowOffsetMaxTimestamp` but we keep the implementation.
   
   @junrao WDYT?
   
   > (3) add tests for follower appends
   
   will complete it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1544046500


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp)

Review Comment:
   >We could stop after finding the first batch matching maxTimestamp.
   
   oh, sorry for neglect that.
   
   > Although it should be impossible, should we handle empty() case?
   
   just let `KafkaApis` return error response with `UNKNOWN`
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1146



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-28 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1544046037


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1141,27 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
 .stream()
 .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
 timestampToSearch,
-false,
 timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
+
+// shortcut the request if the timeout is zero.
+if (timeout.isZero()) {

Review Comment:
   scratch off the previous comment - addAndGet actually doesn't. We will need 
to explicitly return an empty result.  See the code change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16039: RecordHeaders supports the addAll method [kafka]

2024-03-28 Thread via GitHub


github-actions[bot] commented on PR #15034:
URL: https://github.com/apache/kafka/pull/15034#issuecomment-2026574417

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]

2024-03-28 Thread via GitHub


showuon commented on code in PR #14760:
URL: https://github.com/apache/kafka/pull/14760#discussion_r1544017066


##
core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala:
##
@@ -386,6 +386,7 @@ class NodeToControllerRequestThread(
 if (response.authenticationException != null) {
   error(s"Request ${queueItem.request} failed due to authentication error 
with controller",
 response.authenticationException)

Review Comment:
   Should we log: 
   `Disconnecting the connection to the stale controller 
${activeControllerAddress().map(_.idString).getOrElse("null")}`
   like what we did for `NOT_CONTROLLER` error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-03-28 Thread via GitHub


showuon commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1544007618


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -74,25 +72,25 @@ protected final Map 
createRemoteLogMetadataTr
 return map;
 }
 
-protected final Map 
createRemoteLogStorageClassToApiKeyMap() {
-Map map = new HashMap<>();
-map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
-map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
-map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
-map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
-return map;
-}
-
 public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
-Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
-if (apiKey == null) {
-throw new IllegalArgumentException("ApiKey for given 
RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
-   + " does not exist.");
-}
 
-@SuppressWarnings("unchecked")
-ApiMessageAndVersion apiMessageAndVersion = 
remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata);
+String className = remoteLogMetadata.getClass().getName();
+RemoteLogMetadataTransform metadataTransform;
+
+if(className.equals(RemoteLogSegmentMetadata.class.getName())) {
+metadataTransform = new RemoteLogSegmentMetadataTransform();
+} else if 
(className.equals(RemoteLogSegmentMetadataUpdate.class.getName())) {
+metadataTransform = new RemoteLogSegmentMetadataUpdateTransform();
+} else if 
(className.equals(RemotePartitionDeleteMetadata.class.getName())) {
+metadataTransform = new RemotePartitionDeleteMetadataTransform();
+} else if 
(className.equals(RemoteLogSegmentMetadataSnapshot.class.getName())) {
+metadataTransform = new 
RemoteLogSegmentMetadataSnapshotTransform();

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16349) ShutdownableThread fails build by calling Exit with race condition

2024-03-28 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-16349.
-
Fix Version/s: 3.8.0
 Assignee: Greg Harris
   Resolution: Fixed

> ShutdownableThread fails build by calling Exit with race condition
> --
>
> Key: KAFKA-16349
> URL: https://issues.apache.org/jira/browse/KAFKA-16349
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.8.0
>
>
> `ShutdownableThread` calls `Exit.exit()` when the thread's operation throws 
> FatalExitError. In normal operation, this calls System.exit, and exits the 
> process. In tests, the exit procedure is masked with Exit.setExitProcedure to 
> prevent tests that encounter a FatalExitError from crashing the test JVM.
> Masking of exit procedures is usually done in BeforeEach/AfterEach 
> annotations, with the exit procedures cleaned up immediately after the test 
> finishes. If the body of the test creates a ShutdownableThread that outlives 
> the test, such as by omitting `ShutdownableThread#awaitShutdown`, by having 
> `ShutdownableThread#awaitShutdown` be interrupted by a test timeout, or by a 
> race condition between `Exit.resetExitProcedure` and `Exit.exit`, then 
> System.exit() can be called erroneously.
>  
> {noformat}
> // First, in the test thread:
> Exit.setExitProcedure(...)
> try {
> new ShutdownableThread(...).start()
> } finally {
> Exit.resetExitProcedure()
> }
> // Second, in the ShutdownableThread:
> try {
> throw new FatalExitError(...)
> } catch (FatalExitError e) {
> Exit.exit(...) // Calls real System.exit()
> }{noformat}
>  
> This can be resolved by one of the following:
>  # Eliminate FatalExitError usages in code when setExitProcedure is in-use
>  # Eliminate the Exit.exit call from ShutdownableThread, and instead 
> propagate this error to another thread to handle without a race-condition
>  # Eliminate resetExitProcedure by refactoring Exit to be non-static
> FatalExitError is in use in a small number of places, but may be difficult to 
> eliminate:
>  * FinalizedFeatureChangeListener
>  * InterBrokerSendThread
>  * TopicBasedRemoteLogMetadataManager
> There are many other places where Exit is called from a background thread, 
> including some implementations of ShutdownableThread which don't use 
> FatalExitError.
> The effect of this bug is that the build is flaky, as race 
> conditions/timeouts in tests can cause the gradle executors to exit with 
> status code 1, which has happened 26 times in the last 28 days. I have not 
> yet been able to confirm this bug is happening in other tests, but I do have 
> a deterministic reproduction case with the exact same symptoms:
> {noformat}
> Gradle Test Run :core:test > Gradle Test Executor 38 > ShutdownableThreadTest 
> > testShutdownWhenTestTimesOut(boolean) > 
> "testShutdownWhenTestTimesOut(boolean).isInterruptible=true" SKIPPED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':core:test'.
> > Process 'Gradle Test Executor 38' finished with non-zero exit value 1
>   This problem might be caused by incorrect test process configuration.
>   For more on test execution, please refer to 
> https://docs.gradle.org/8.6/userguide/java_testing.html#sec:test_execution in 
> the Gradle documentation.{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16349: Prevent race conditions in Exit class from stopping test JVM [kafka]

2024-03-28 Thread via GitHub


gharris1727 merged PR #15484:
URL: https://github.com/apache/kafka/pull/15484


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16349: Prevent race conditions in Exit class from stopping test JVM [kafka]

2024-03-28 Thread via GitHub


gharris1727 commented on PR #15484:
URL: https://github.com/apache/kafka/pull/15484#issuecomment-2026540118

   Test failures appear unrelated, and I got a local run of `./gradlew test` to 
pass!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


showuon commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1543916425


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp)

Review Comment:
   Although it should be impossible, should we handle `empty()` case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-03-28 Thread via GitHub


Owen-CH-Leung commented on PR #15489:
URL: https://github.com/apache/kafka/pull/15489#issuecomment-2026412628

   > adde167
   
   No prob. Added back `ToString` to troubleshoot 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


showuon commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1543906755


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp)

Review Comment:
   Good suggestion. We should use `find` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Log slow controller events [kafka]

2024-03-28 Thread via GitHub


soarez commented on PR #15622:
URL: https://github.com/apache/kafka/pull/15622#issuecomment-2026356492

   > events have relatively unique names (we include offset in some) so the 
cardinality is quite high
   
   I see. What I meant would result in too many more metrics. Makes sense then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Log slow controller events [kafka]

2024-03-28 Thread via GitHub


mumrah commented on code in PR #15622:
URL: https://github.com/apache/kafka/pull/15622#discussion_r1543844228


##
metadata/src/main/java/org/apache/kafka/controller/metrics/SlowEventsLogger.java:
##
@@ -0,0 +1,58 @@
+package org.apache.kafka.controller.metrics;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+public class SlowEventsLogger {
+/**
+ * Don't report any p99 events below this threshold. This prevents the 
controller from reporting p99 event
+ * times in the idle case.
+ */
+private static final int MIN_SLOW_EVENT_TIME_MS = 100;
+
+/**
+ * Calculating the p99 from the histogram consumes some resources, so only 
update it every so often.
+ */
+private static final int P99_REFRESH_INTERVAL_MS = 3;
+
+private final Supplier p99Supplier;
+private final Logger log;
+private final Time time;
+private double p99;
+private Timer percentileUpdateTimer;
+
+public SlowEventsLogger(
+Supplier p99Supplier,
+Logger log,
+Time time
+) {
+this.p99Supplier = p99Supplier;
+this.log = log;
+this.time = time;
+this.percentileUpdateTimer = time.timer(P99_REFRESH_INTERVAL_MS);
+}
+
+public void maybeLog(String name, long durationNs) {
+long durationMs = MILLISECONDS.convert(durationNs, NANOSECONDS);
+if (durationMs > MIN_SLOW_EVENT_TIME_MS && durationMs > p99) {

Review Comment:
   Yea, i need to initialize it in the constructor 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Log slow controller events [kafka]

2024-03-28 Thread via GitHub


mumrah commented on PR #15622:
URL: https://github.com/apache/kafka/pull/15622#issuecomment-2026321417

   @soarez wdym by tagging here? 
   
   Note that the events have relatively unique names (we include offset in 
some) so the cardinality is quite high.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Log slow controller events [kafka]

2024-03-28 Thread via GitHub


soarez commented on code in PR #15622:
URL: https://github.com/apache/kafka/pull/15622#discussion_r1543829135


##
metadata/src/main/java/org/apache/kafka/controller/metrics/SlowEventsLogger.java:
##
@@ -0,0 +1,58 @@
+package org.apache.kafka.controller.metrics;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+public class SlowEventsLogger {
+/**
+ * Don't report any p99 events below this threshold. This prevents the 
controller from reporting p99 event
+ * times in the idle case.
+ */
+private static final int MIN_SLOW_EVENT_TIME_MS = 100;
+
+/**
+ * Calculating the p99 from the histogram consumes some resources, so only 
update it every so often.
+ */
+private static final int P99_REFRESH_INTERVAL_MS = 3;
+
+private final Supplier p99Supplier;
+private final Logger log;
+private final Time time;
+private double p99;
+private Timer percentileUpdateTimer;
+
+public SlowEventsLogger(
+Supplier p99Supplier,
+Logger log,
+Time time
+) {
+this.p99Supplier = p99Supplier;
+this.log = log;
+this.time = time;
+this.percentileUpdateTimer = time.timer(P99_REFRESH_INTERVAL_MS);
+}
+
+public void maybeLog(String name, long durationNs) {
+long durationMs = MILLISECONDS.convert(durationNs, NANOSECONDS);
+if (durationMs > MIN_SLOW_EVENT_TIME_MS && durationMs > p99) {

Review Comment:
   It seems possible that `p99 = p99Supplier.get();` may not have run yet at 
this point. Should `p99` have an explicit default value? 



##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -28,6 +28,8 @@
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;

Review Comment:
   Is `Supplier` needed here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-03-28 Thread via GitHub


soarez commented on code in PR #15620:
URL: https://github.com/apache/kafka/pull/15620#discussion_r1543811354


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java:
##
@@ -74,25 +72,25 @@ protected final Map 
createRemoteLogMetadataTr
 return map;
 }
 
-protected final Map 
createRemoteLogStorageClassToApiKeyMap() {
-Map map = new HashMap<>();
-map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
-map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
-map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
-map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
-return map;
-}
-
 public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
-Short apiKey = 
remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
-if (apiKey == null) {
-throw new IllegalArgumentException("ApiKey for given 
RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
-   + " does not exist.");
-}
 
-@SuppressWarnings("unchecked")
-ApiMessageAndVersion apiMessageAndVersion = 
remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata);
+String className = remoteLogMetadata.getClass().getName();
+RemoteLogMetadataTransform metadataTransform;
+
+if(className.equals(RemoteLogSegmentMetadata.class.getName())) {
+metadataTransform = new RemoteLogSegmentMetadataTransform();
+} else if 
(className.equals(RemoteLogSegmentMetadataUpdate.class.getName())) {
+metadataTransform = new RemoteLogSegmentMetadataUpdateTransform();
+} else if 
(className.equals(RemotePartitionDeleteMetadata.class.getName())) {
+metadataTransform = new RemotePartitionDeleteMetadataTransform();
+} else if 
(className.equals(RemoteLogSegmentMetadataSnapshot.class.getName())) {
+metadataTransform = new 
RemoteLogSegmentMetadataSnapshotTransform();

Review Comment:
   Instead of comparing `remoteLogMetadata.getClass().getName()` with the 
various `*.class.getName()`, can we instead just compare the classes directly? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]

2024-03-28 Thread via GitHub


soarez commented on PR #14760:
URL: https://github.com/apache/kafka/pull/14760#issuecomment-2026260420

   This still needs a review from a committer. 
   
   @mimaison @mumrah @rondagostino @showuon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15955) Migrating ZK brokers send dir assignments

2024-03-28 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-15955:

Priority: Minor  (was: Major)

> Migrating ZK brokers send dir assignments
> -
>
> Key: KAFKA-15955
> URL: https://issues.apache.org/jira/browse/KAFKA-15955
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Proven Provenzano
>Priority: Minor
>
> Broker in ZooKeeper mode, while in migration mode, should start sending 
> directory assignments to the KRaft Controller using AssignmentsManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Extracted the creation of ReplicaManager in BrokerServer [kafka]

2024-03-28 Thread via GitHub


soarez commented on PR #15617:
URL: https://github.com/apache/kafka/pull/15617#issuecomment-2026228502

   Olá Filipe. Thanks for your contribution. 
   
   `BrokerServer` shouldn't aim to follow the structure or interface in 
`KafkaServer`. What those two are meant to have in common is defined in 
`KafkaBroker`. So on its own, I don't think this change makes much sense. If 
you have a test that's going to make use of this, perhaps you could bundle 
those two changes in the same PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16286) KRaft doesn't always notify listener of latest leader

2024-03-28 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-16286.

Resolution: Fixed

> KRaft doesn't always notify listener of latest leader
> -
>
> Key: KAFKA-16286
> URL: https://issues.apache.org/jira/browse/KAFKA-16286
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.8.0
>
>
> If a listener registers with RaftClient after the KRaft replica has 
> transition to follower it will not get notified of the current leader until 
> it has transitioned to another state.
> In a stable cluster the listeners that are not the leader (inactive 
> controllers and brokers) will only get notified when then leader changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16286) KRaft doesn't always notify listener of latest leader

2024-03-28 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-16286:
---
Fix Version/s: 3.8.0

> KRaft doesn't always notify listener of latest leader
> -
>
> Key: KAFKA-16286
> URL: https://issues.apache.org/jira/browse/KAFKA-16286
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.8.0
>
>
> If a listener registers with RaftClient after the KRaft replica has 
> transition to follower it will not get notified of the current leader until 
> it has transitioned to another state.
> In a stable cluster the listeners that are not the leader (inactive 
> controllers and brokers) will only get notified when then leader changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]

2024-03-28 Thread via GitHub


soarez commented on PR #15607:
URL: https://github.com/apache/kafka/pull/15607#issuecomment-2026208622

   @mjsax I think this one should also be tagged 'streams'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove redundant ApiVersionsResponse#filterApis [kafka]

2024-03-28 Thread via GitHub


soarez commented on PR #15611:
URL: https://github.com/apache/kafka/pull/15611#issuecomment-2026205805

   Seems like a gray area to me. Regardless, I'd like to encourage @brandboat 
to keep looking for opportunities to cleanup and refactor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-28 Thread via GitHub


soarez commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1543761783


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,32 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  class ErrorQueue(initialErrorCounts: Errors*) {
+private val queue: mutable.Queue[Errors] = mutable.Queue.empty ++ 
initialErrorCounts

Review Comment:
   It think this can just be
   
   ```suggestion
   private val queue: mutable.Queue[Errors] = 
mutable.Queue(initialErrorCounts: _*)
   ```



##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,32 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  class ErrorQueue(initialErrorCounts: Errors*) {
+private val queue: mutable.Queue[Errors] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  queue.dequeue()
+}

Review Comment:
   Now that this class is simple enough, could its use be replaced with 
`java.util.concurrent.ConcurrentLinkedQueue`?



##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,32 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  class ErrorQueue(initialErrorCounts: Errors*) {

Review Comment:
   Now that you've removed the error counts, this variable name - 
`initialErrorCounts ` - no longer makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1543754297


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+latestTimestampSegment.log.batchesFrom(position.position).asScala
+  .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp)

Review Comment:
   We could stop after finding the first batch matching maxTimestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16446) Log slow controller events

2024-03-28 Thread David Arthur (Jira)
David Arthur created KAFKA-16446:


 Summary: Log slow controller events
 Key: KAFKA-16446
 URL: https://issues.apache.org/jira/browse/KAFKA-16446
 Project: Kafka
  Issue Type: Improvement
Reporter: David Arthur


Occasionally, we will see very high p99 controller event processing times. 
Unless DEBUG logs are enabled, it is impossible to see which events are slow. 

Typically this happens during controller startup/failover, though it can also 
happen sporadically when the controller gets overloaded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Log slow controller events [kafka]

2024-03-28 Thread via GitHub


mumrah opened a new pull request, #15622:
URL: https://github.com/apache/kafka/pull/15622

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-28 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831966#comment-17831966
 ] 

Chia-Ping Tsai commented on KAFKA-16310:


see the following link to check the revert from 3.7
https://github.com/apache/kafka/commit/bd5989dd195d42c1608582316367a03b2c78cb11
https://github.com/apache/kafka/commit/fc646f920701b792eb683bacd513a1f20909f6bc

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16342) Fix compressed records

2024-03-28 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16342:
---
Fix Version/s: (was: 3.7.1)

> Fix compressed records
> --
>
> Key: KAFKA-16342
> URL: https://issues.apache.org/jira/browse/KAFKA-16342
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16341) Fix un-compressed records

2024-03-28 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16341:
---
Fix Version/s: (was: 3.7.1)

> Fix un-compressed records
> -
>
> Key: KAFKA-16341
> URL: https://issues.apache.org/jira/browse/KAFKA-16341
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Luke Chen
>Assignee: Johnny Hsu
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-28 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16310:
---
Fix Version/s: (was: 3.7.1)

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-28 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831965#comment-17831965
 ] 

Chia-Ping Tsai commented on KAFKA-16310:


{quote}
Since the follower only maintains offsetForMaxTimestamp at the batch level, the 
listMaxTimestamp API was never implemented correctly. So, technically, there 
was no regression for listMaxTimestamp. We could just fix this issue in trunk 
without backporting to the old branch.
{quote}

I will revert KAFKA-16341 and KAFKA-16342 from 3.7


> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 opened a new pull request, #15621:
URL: https://github.com/apache/kafka/pull/15621

   We do iterate the records to find the `offsetOfMaxTimestamp` instead of 
returning the cached one when handling `ListOffsetsRequest.MAX_TIMESTAMP`, 
since it is hard to align all paths to get correct `offsetOfMaxTimestamp`. The 
known paths are shown below.
   
   1.  `convertAndAssignOffsetsNonCompressed` -> we CAN get correct 
`offsetOfMaxTimestamp` when validating all records
   2. `assignOffsetsNonCompressed` -> ditto
   3. `validateMessagesAndAssignOffsetsCompressed` -> ditto
   4. `validateMessagesAndAssignOffsetsCompressed#buildRecordsAndAssignOffsets` 
-> ditto
   5. `appendAsFollow#append#analyzeAndValidateRecords` -> we CAN'T get correct 
`offsetOfMaxTimestamp` as iterating all records is expensive when fetching 
records from leader
   6. `LogSegment#recover` -> ditto
   
   https://issues.apache.org/jira/browse/KAFKA-16310
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-28 Thread via GitHub


gharris1727 merged PR #15597:
URL: https://github.com/apache/kafka/pull/15597


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-28 Thread via GitHub


gharris1727 commented on PR #15597:
URL: https://github.com/apache/kafka/pull/15597#issuecomment-2026050399

   Test failures appear unrelated:
   1. ReplicaManagerTest has tickets 
[KAFKA-16323](https://issues.apache.org/jira/browse/KAFKA-16323) and 
[KAFKA-13530](https://issues.apache.org/jira/browse/KAFKA-13530)
   2. ControllerRegistrationManagerTest has ticket 
[KAFKA-15897](https://issues.apache.org/jira/browse/KAFKA-15897)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-28 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831959#comment-17831959
 ] 

Jun Rao commented on KAFKA-16310:
-

Since the follower only maintains offsetForMaxTimestamp at the batch level, the 
listMaxTimestamp API was never implemented correctly. So, technically, there 
was no regression for listMaxTimestamp. We could just fix this issue in trunk 
without backporting to the old branch.

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on PR #15618:
URL: https://github.com/apache/kafka/pull/15618#issuecomment-2026013673

   > I am just saying that we only need to fix this in trunk since the 
implementation was never correct in any previous branches, thus not a 
regression.
   
   got it. will open another PR for trunk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 closed pull request #15618: KAFKA-16310 ListOffsets doesn't report the 
offset with maxTimestamp a…
URL: https://github.com/apache/kafka/pull/15618


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-03-28 Thread via GitHub


gharris1727 commented on PR #15620:
URL: https://github.com/apache/kafka/pull/15620#issuecomment-2026008220

   @linu-shibu Could you fix the rawtypes warnings in this class? You can see 
them if you make this change in build.gradle:
   
   ```diff
options.encoding = 'UTF-8'
options.compilerArgs << "-Xlint:all"
// temporary exclusions until all the warnings are fixed
   -if (!project.path.startsWith(":connect"))
   +if (!project.path.startsWith(":connect") && 
!project.path.startsWith(":storage"))
  options.compilerArgs << "-Xlint:-rawtypes"
options.compilerArgs << "-Xlint:-serial"
options.compilerArgs << "-Xlint:-try"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on PR #15618:
URL: https://github.com/apache/kafka/pull/15618#issuecomment-2026000277

   > I am not sure I understand this. All we need for this solution (or 
workaround) is the "max timestamp" of a segment, since we always iterate the 
batches (from the segment having the max timestamp) to find the "offset" of max 
timestamp when handling the ListOffsetsRequest.MAX_TIMESTAMP. Hence, we can 
correct the implement for all active branches (include 3.6.3) by this PR.
   
   Yes, I agree that we can fix this issue completely. I am just saying that we 
only need to fix this in trunk since the implementation was never correct in 
any previous branches, thus not a regression.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on code in PR #15618:
URL: https://github.com/apache/kafka/pull/15618#discussion_r1543560539


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
 val segmentsCopy = logSegments.toBuffer
 val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.maxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
+val batch = 
latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())

Review Comment:
   Yes, suppose you have a 1GB segment and the maxTimestamp is in the last 
batch. latestTimestampSegment.log.batches() needs to read 1GB from disk. Using 
the offsetIndex, we only need to read the index and the index.interval (default 
to 4KB) worth of bytes.
   
   > Is the impl of lookup like this?
   
   Yes, that's what I was thinking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on PR #15618:
URL: https://github.com/apache/kafka/pull/15618#issuecomment-2025943523

   > Since the follower only maintains offsetForMaxTimestamp at the batch 
level, the listMaxTimestamp API was never implemented correctly.
   
   I am not sure I understand this. All we need for this solution (or 
workaround) is the "max timestamp" of a segment, since we always iterate the 
batches (from the segment having the max timestamp) to find the "offset" of max 
timestamp when handling the `ListOffsetsRequest.MAX_TIMESTAMP`. Hence, we can 
correct the implement for all active branches (include 3.6.3) by this PR.
   
   > So, technically, there was no regression for listMaxTimestamp. It seems 
there is no need to fix this in the 3.6? We could just fix it in trunk.
   
   BTW, I'm ok to keep the behavior for 3.6 as it is not a kind of "regression".
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-03-28 Thread via GitHub


linu-shibu commented on PR #15620:
URL: https://github.com/apache/kafka/pull/15620#issuecomment-2025930412

   @gharris1727 please do take a look and provide some guidance/feedback. 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-03-28 Thread via GitHub


linu-shibu opened a new pull request, #15620:
URL: https://github.com/apache/kafka/pull/15620

   
   
   RemoteLogMetadata object, and has to dispatch to one of four serializers 
depending on it's type which is currently done by taking the class name of the 
RemoteLogMetadata and looking it up in maps to find the corresponding 
serializer for that class. This later requires an unchecked cast, because the 
RemoteLogMetadataTransform is generic. This is replaced by if-elseif-else 
statements which are type-safe. Map lookup is also removed in this new 
implementation which might make process faster as mentioned in the ticket.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on code in PR #15618:
URL: https://github.com/apache/kafka/pull/15618#discussion_r1543505409


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
 val segmentsCopy = logSegments.toBuffer
 val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.maxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
+val batch = 
latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())

Review Comment:
   > latestTimestampSegment.log.batches() scans the whole log segment and could 
introduce unnecessary extra I/O. So, there could be performance degradation 
because of that.
   
   The `batches` is a `iterable` object, and its implementation load the batch 
only if we call `next`. 
https://github.com/apache/kafka/blob/3.6/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java#L63
   
   Hence, the benefit of looking up for a batch (find the position and then use 
it to call `batchesFrom`) is that we can save some I/O by skipping some 
batches. Please correct me if I misunderstand anything.
   
   > I am not sure I understand this. Looking up for a batch with each 
baseOffset or lastOffset will locate the same batch using the offset index, 
right?
   
   Is the impl of lookup like this?
   ```scala
   val position = 
latestTimestampSegment.offsetIndex.lookup(latestTimestampSegment.offsetOfMaxTimestampSoFar)
   latestTimestampSegment.log.batchesFrom(position.position).asScala
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   Agree, there could be async requests, with known coord, not getting a 
response in the above while loop, then getting it while the super.close waits, 
so we should trigger the callbacks at this point.  
   
   But this makes me notice, aren't we breaking the `close(Duration)` contract 
here, calling that `super.close(timer)` on the finally clause? Let's say async 
requests that are not getting a response within the timeout in the above while 
loop (so we block for time on the while), then `finally`, the super class 
blocks for that time again 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139).
 Am I missing something? (I can file a separate Jira if I'm not missing 
something here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   Agree, there could be async requests, with known coord, not getting a 
response in the above while loop, then getting it while the super.close waits, 
so we should trigger the callbacks at this point.  
   
   But this makes me notice, aren't we breaking the `close(Duration)` contract 
here, calling that `super.close(timer)` on the finally clause? Let's say async 
requests that are not getting a response within the timeout in the above while 
loop (so we block for time on the while), then `finally`, the super class 
blocks for that time again 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139)
 . Am I missing something? (I can file a separate Jira if I'm not missing 
something here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   Agree, there could be async requests, with known coord, not getting a 
response in the above while loop, then getting it while the super.close waits, 
so we trigger the callbacks.  
   
   But this makes me notice, aren't we breaking the `close(Duration)` contract 
here, calling that `super.close(timer)` on the finally clause? Let's say async 
requests that are not getting a response within the timeout in the above while 
loop (so we block for time on the while), then `finally`, the super class 
blocks for that time again 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139)
 . Am I missing something? (I can file a separate Jira if I'm not missing 
something here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
 }
 } finally {
 super.close(timer);
+// Super-class close may wait for more commit callbacks to 
complete.
+invokeCompletedOffsetCommitCallbacks();

Review Comment:
   Agree, there could be async requests, with known coord, not getting a 
response in the above while loop, then getting it while the super.close waits, 
so we trigger the callbacks.  But this makes me notice, aren't we breaking the 
close(Duration) contract here, calling that super.close(timer) on the finally 
clause? Let's say async requests that are not getting a response within the 
timeout in the above while loop (so we block for time on the while), then 
`finally`, the super class blocks for that time again 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139)
 . Am I missing something? (I can file a separate Jira if I'm not missing 
something here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-28 Thread via GitHub


lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 
 private boolean invokePendingAsyncCommits(Timer timer) {
-if (inFlightAsyncCommits.get() == 0) {
+if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) 
{

Review Comment:
   This makes sense to me, to fill a gap in the case of commit sync with empty 
offsets, that skips the path of sending an actual request, and that's why it 
looses the guarantee of executing the callbacks as I see it. 
   
   This makes the logic consistent with what happens if the commit sync has 
non-empty offsets. In that case, it does execute the callbacks for previous 
async commits that were waiting for coord:  the sync commit would be blocked on 
the same findCoord request (there's always just 1), and the moment the coord is 
found the async is marked as inflight 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036),
 so it will be considered for callbacks 
[here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16445) PATCH method for connector configuration

2024-03-28 Thread Ivan Yurchenko (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Yurchenko updated KAFKA-16445:
---
Summary: PATCH method for connector configuration  (was: PATCH method for 
connecto configuration)

> PATCH method for connector configuration
> 
>
> Key: KAFKA-16445
> URL: https://issues.apache.org/jira/browse/KAFKA-16445
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Ivan Yurchenko
>Assignee: Ivan Yurchenko
>Priority: Minor
>
> As  [KIP-477: Add PATCH method for connector config in Connect REST 
> API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API]
>  suggests, we should introduce the PATCH method for connector configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16445) PATCH method for connecto configuration

2024-03-28 Thread Ivan Yurchenko (Jira)
Ivan Yurchenko created KAFKA-16445:
--

 Summary: PATCH method for connecto configuration
 Key: KAFKA-16445
 URL: https://issues.apache.org/jira/browse/KAFKA-16445
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Reporter: Ivan Yurchenko
Assignee: Ivan Yurchenko


As  [KIP-477: Add PATCH method for connector config in Connect REST 
API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API]
 suggests, we should introduce the PATCH method for connector configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on PR #15618:
URL: https://github.com/apache/kafka/pull/15618#issuecomment-2025853565

   @chia7712: Since the follower only maintains offsetForMaxTimestamp at the 
batch level, the listMaxTimestamp API was never implemented correctly. So, 
technically, there was no regression for listMaxTimestamp. It seems there is no 
need to fix this in the 3.6? We could just fix it in trunk. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on code in PR #15618:
URL: https://github.com/apache/kafka/pull/15618#discussion_r1543430656


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
 val segmentsCopy = logSegments.toBuffer
 val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.maxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
+val batch = 
latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())

Review Comment:
   @chia7712 :` latestTimestampSegment.log.batches()` scans the whole log 
segment and could introduce unnecessary extra I/O. So, there could be 
performance degradation because of that. 
   
   > Hence we have to use condition baseOffset <= offset <= lastOffset to find 
batch.
   
   I am not sure I understand this. Looking up for a batch with each baseOffset 
or lastOffset will locate the same batch using the offset index, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIp] Fixes consume bench test.py maybe [kafka]

2024-03-28 Thread via GitHub


kirktrue closed pull request #15427: [WIp] Fixes consume bench test.py maybe
URL: https://github.com/apache/kafka/pull/15427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-03-28 Thread via GitHub


kirktrue closed pull request #15250: KAFKA-15974: Enforce that event processing 
respects user-provided timeout
URL: https://github.com/apache/kafka/pull/15250


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16444) Run KIP-848 unit tests under code coverage

2024-03-28 Thread Kirk True (Jira)
Kirk True created KAFKA-16444:
-

 Summary: Run KIP-848 unit tests under code coverage
 Key: KAFKA-16444
 URL: https://issues.apache.org/jira/browse/KAFKA-16444
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer, unit tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16442) Update streams_standby_replica_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16442:
--
Component/s: streams

> Update streams_standby_replica_test.py to support KIP-848’s group protocol 
> config
> -
>
> Key: KAFKA-16442
> URL: https://issues.apache.org/jira/browse/KAFKA-16442
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, streams, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in 
> {{streams_standby_replica_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16443) Update streams_static_membership_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16443:
--
Component/s: streams

> Update streams_static_membership_test.py to support KIP-848’s group protocol 
> config
> ---
>
> Key: KAFKA-16443
> URL: https://issues.apache.org/jira/browse/KAFKA-16443
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, streams, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in 
> {{streams_static_membership_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16441) Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16441:
--
Component/s: streams

> Update streams_broker_down_resilience_test.py to support KIP-848’s group 
> protocol config
> 
>
> Key: KAFKA-16441
> URL: https://issues.apache.org/jira/browse/KAFKA-16441
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, streams, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in 
> {{streams_broker_down_resilience_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16441) Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16441:
--
Description: 
This task is to update the test method(s) in 
{{streams_broker_down_resilience_test.py}} to support the {{group.protocol}} 
configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.

  was:
This task is to update the test method(s) in {{security_test.py}} to support 
the {{group.protocol}} configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.


> Update streams_broker_down_resilience_test.py to support KIP-848’s group 
> protocol config
> 
>
> Key: KAFKA-16441
> URL: https://issues.apache.org/jira/browse/KAFKA-16441
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in 
> {{streams_broker_down_resilience_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16443) Update streams_static_membership_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16443:
--
Description: 
This task is to update the test method(s) in 
{{streams_static_membership_test.py}} to support the {{group.protocol}} 
configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.

  was:
This task is to update the test method(s) in 
{{streams_standby_replica_test.py}} to support the {{group.protocol}} 
configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.


> Update streams_static_membership_test.py to support KIP-848’s group protocol 
> config
> ---
>
> Key: KAFKA-16443
> URL: https://issues.apache.org/jira/browse/KAFKA-16443
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in 
> {{streams_static_membership_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16443) Update streams_static_membership_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)
Kirk True created KAFKA-16443:
-

 Summary: Update streams_static_membership_test.py to support 
KIP-848’s group protocol config
 Key: KAFKA-16443
 URL: https://issues.apache.org/jira/browse/KAFKA-16443
 Project: Kafka
  Issue Type: Test
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 4.0.0


This task is to update the test method(s) in 
{{streams_standby_replica_test.py}} to support the {{group.protocol}} 
configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16442) Update streams_standby_replica_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16442:
--
Description: 
This task is to update the test method(s) in 
{{streams_standby_replica_test.py}} to support the {{group.protocol}} 
configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.

  was:
This task is to update the test method(s) in {{security_test.py}} to support 
the {{group.protocol}} configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.


> Update streams_standby_replica_test.py to support KIP-848’s group protocol 
> config
> -
>
> Key: KAFKA-16442
> URL: https://issues.apache.org/jira/browse/KAFKA-16442
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in 
> {{streams_standby_replica_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16442) Update streams_standby_replica_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)
Kirk True created KAFKA-16442:
-

 Summary: Update streams_standby_replica_test.py to support 
KIP-848’s group protocol config
 Key: KAFKA-16442
 URL: https://issues.apache.org/jira/browse/KAFKA-16442
 Project: Kafka
  Issue Type: Test
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 4.0.0


This task is to update the test method(s) in {{security_test.py}} to support 
the {{group.protocol}} configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16441) Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16441:
--
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Update streams_broker_down_resilience_test.py to support KIP-848’s group 
> protocol config
> 
>
> Key: KAFKA-16441
> URL: https://issues.apache.org/jira/browse/KAFKA-16441
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in {{security_test.py}} to support 
> the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16441) Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)
Kirk True created KAFKA-16441:
-

 Summary: Update streams_broker_down_resilience_test.py to support 
KIP-848’s group protocol config
 Key: KAFKA-16441
 URL: https://issues.apache.org/jira/browse/KAFKA-16441
 Project: Kafka
  Issue Type: Test
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


This task is to update the test method(s) in {{security_test.py}} to support 
the {{group.protocol}} configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16440) Update security_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16440:
--
Description: 
This task is to update the test method(s) in {{security_test.py}} to support 
the {{group.protocol}} configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.

  was:
This task is to update the test method(s) in 
{{replication_replica_failure_test.py}} to support the {{group.protocol}} 
configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.


> Update security_test.py to support KIP-848’s group protocol config
> --
>
> Key: KAFKA-16440
> URL: https://issues.apache.org/jira/browse/KAFKA-16440
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in {{security_test.py}} to support 
> the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16440) Update security_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)
Kirk True created KAFKA-16440:
-

 Summary: Update security_test.py to support KIP-848’s group 
protocol config
 Key: KAFKA-16440
 URL: https://issues.apache.org/jira/browse/KAFKA-16440
 Project: Kafka
  Issue Type: Test
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


This task is to update the test method(s) in 
{{replication_replica_failure_test.py}} to support the {{group.protocol}} 
configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16439) Update replication_replica_failure_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16439:
--
Reviewer:   (was: Lucas Brutschy)

> Update replication_replica_failure_test.py to support KIP-848’s group 
> protocol config
> -
>
> Key: KAFKA-16439
> URL: https://issues.apache.org/jira/browse/KAFKA-16439
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in 
> {{replication_replica_failure_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16439) Update replication_replica_failure_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16439:
--
Description: 
This task is to update the test method(s) in 
{{replication_replica_failure_test.py}} to support the {{group.protocol}} 
configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.

  was:
This task is to update the test method(s) in {{replica_scale_test.py}} to 
support the {{group.protocol}} configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.


> Update replication_replica_failure_test.py to support KIP-848’s group 
> protocol config
> -
>
> Key: KAFKA-16439
> URL: https://issues.apache.org/jira/browse/KAFKA-16439
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in 
> {{replication_replica_failure_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on code in PR #15618:
URL: https://github.com/apache/kafka/pull/15618#discussion_r1543390343


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
 val segmentsCopy = logSegments.toBuffer
 val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.maxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
+val batch = 
latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())

Review Comment:
   @junrao  thanks for feedback. I use the max timestamp to find the "first" 
batch instead of using offset index. 
   
   It seems to me using max timestamp is more simple since the offset stored by 
`maxTimestampAndOffsetSoFar` could be either the last offset or offset of max 
timestamp. Hence we have to use condition `baseOffset <= offset <= lastOffset` 
to find batch.
   
   I'm ok to use offset if using max timestamp to find first batch have any 
side effect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16439) Update replication_replica_failure_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)
Kirk True created KAFKA-16439:
-

 Summary: Update replication_replica_failure_test.py to support 
KIP-848’s group protocol config
 Key: KAFKA-16439
 URL: https://issues.apache.org/jira/browse/KAFKA-16439
 Project: Kafka
  Issue Type: Test
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


This task is to update the test method(s) in {{replica_scale_test.py}} to 
support the {{group.protocol}} configuration introduced in 
[KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
 by adding an optional {{group_protocol}} argument to the tests and matrixes.

See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16438) Update consumer_test.py’s static tests to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16438:
--
Issue Type: Test  (was: Bug)

> Update consumer_test.py’s static tests to support KIP-848’s group protocol 
> config
> -
>
> Key: KAFKA-16438
> URL: https://issues.apache.org/jira/browse/KAFKA-16438
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the following test method(s) in {{consumer_test.py}} 
> to support the {{group.protocol}} configuration:
> * {{test_fencing_static_consumer}}
> * {{test_static_consumer_bounce}}
> * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16438) Update consumer_test.py’s static tests to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16438:
--
Fix Version/s: 3.8.0

> Update consumer_test.py’s static tests to support KIP-848’s group protocol 
> config
> -
>
> Key: KAFKA-16438
> URL: https://issues.apache.org/jira/browse/KAFKA-16438
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the following test method(s) in {{consumer_test.py}} 
> to support the {{group.protocol}} configuration:
> * {{test_fencing_static_consumer}}
> * {{test_static_consumer_bounce}}
> * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16438) Update consumer_test.py’s static tests to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)
Kirk True created KAFKA-16438:
-

 Summary: Update consumer_test.py’s static tests to support 
KIP-848’s group protocol config
 Key: KAFKA-16438
 URL: https://issues.apache.org/jira/browse/KAFKA-16438
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Reporter: Kirk True
Assignee: Kirk True


This task is to update the following test method(s) in {{consumer_test.py}} to 
support the {{group.protocol}} configuration:

* {{test_fencing_static_consumer}}
* {{test_static_consumer_bounce}}
* {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16275) Update kraft_upgrade_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16275:
--
Fix Version/s: 3.8.0
   (was: 4.0.0)

> Update kraft_upgrade_test.py to support KIP-848’s group protocol config
> ---
>
> Key: KAFKA-16275
> URL: https://issues.apache.org/jira/browse/KAFKA-16275
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in {{kraft_upgrade_test.py}} to 
> support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16275) Update kraft_upgrade_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16275:
-

Assignee: Kirk True

> Update kraft_upgrade_test.py to support KIP-848’s group protocol config
> ---
>
> Key: KAFKA-16275
> URL: https://issues.apache.org/jira/browse/KAFKA-16275
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in {{kraft_upgrade_test.py}} to 
> support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16271) Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16271.
---
  Reviewer: Lucas Brutschy
Resolution: Fixed

> Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol 
> config
> --
>
> Key: KAFKA-16271
> URL: https://issues.apache.org/jira/browse/KAFKA-16271
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in 
> {{consumer_rolling_upgrade_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.
> The tricky wrinkle here is that the existing test relies on client-side 
> assignment strategies that aren't applicable with the new KIP-848-enabled 
> consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-03-28 Thread via GitHub


kirktrue commented on PR #15594:
URL: https://github.com/apache/kafka/pull/15594#issuecomment-2025765078

   @lucasbru—can you take a look at this system test change? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16275) Update kraft_upgrade_test.py to support KIP-848’s group protocol config

2024-03-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16275:
--
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Update kraft_upgrade_test.py to support KIP-848’s group protocol config
> ---
>
> Key: KAFKA-16275
> URL: https://issues.apache.org/jira/browse/KAFKA-16275
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in {{kraft_upgrade_test.py}} to 
> support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-03-28 Thread via GitHub


fvaleri commented on PR #15273:
URL: https://github.com/apache/kafka/pull/15273#issuecomment-2025743216

   Closing this one in favor of the work done for KAFKA-15853.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-03-28 Thread via GitHub


fvaleri closed pull request #15273: KAFKA-14585: Refactoring for moving the 
storage tool
URL: https://github.com/apache/kafka/pull/15273


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


junrao commented on code in PR #15618:
URL: https://github.com/apache/kafka/pull/15618#discussion_r1543349555


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
 val segmentsCopy = logSegments.toBuffer
 val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.maxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
+val batch = 
latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp())

Review Comment:
   Hmm, iterating all batches can be expensive. We could use the offset index 
to find the batch containing the offset in maxTimestampAndOffsetSoFar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-28 Thread via GitHub


mimaison commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1543199493


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+public class KafkaConfig {
+
+/** * Log Configuration ***/
+public final static String NUM_PARTITIONS_CONFIG = "num.partitions";
+public final static String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs";
+public final static String LOG_DIR_CONFIG = LOG_PREFIX + "dir";
+public final static String LOG_SEGMENT_BYTES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG);
+
+public final static String LOG_ROLL_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG);
+public final static String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + 
"roll.hours";
+
+public final static String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG);
+public final static String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX 
+ "roll.jitter.hours";
+
+public final static String LOG_RETENTION_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG);
+public final static String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX 
+ "retention.minutes";
+public final static String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + 
"retention.hours";
+
+public final static String LOG_RETENTION_BYTES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_BYTES_CONFIG);
+public final static String LOG_CLEANUP_INTERVAL_MS_CONFIG = LOG_PREFIX + 
"retention.check.interval.ms";
+public final static String LOG_CLEANUP_POLICY_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.CLEANUP_POLICY_CONFIG);
+public final static String LOG_INDEX_SIZE_MAX_BYTES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG);
+public final static String LOG_INDEX_INTERVAL_BYTES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG);
+public final static String LOG_FLUSH_INTERVAL_MESSAGES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG);
+public final static String LOG_DELETE_DELAY_MS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG);
+public final static String LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG = 
LOG_PREFIX + "flush.scheduler.interval.ms";
+public final static String LOG_FLUSH_INTERVAL_MS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FLUSH_MS_CONFIG);
+public final static String LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG 
= LOG_PREFIX + "flush.offset.checkpoint.interval.ms";
+public final static String 
LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG = LOG_PREFIX + 
"flush.start.offset.checkpoint.interval.ms";
+public final static String LOG_PRE_ALLOCATE_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.PREALLOCATE_CONFIG);
+
+/* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */
+/**
+ * @deprecated since "3.0"
+ */
+@Deprecated
+public final static String LOG_MESSAGE_FORMAT_VERSION_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG);
+
+public final static String LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG);
+
+/* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for 
details */
+/**
+ * @deprecated since "3.6"
+ */
+@Deprecated
+public final static String LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG 
= 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG);
+
+public final static String LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG = 

Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-03-28 Thread via GitHub


clolov commented on PR #14716:
URL: https://github.com/apache/kafka/pull/14716#issuecomment-2025540351

   Okay, enjoy your time off!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] 1024 impl [kafka]

2024-03-28 Thread via GitHub


wcarlson5 opened a new pull request, #15619:
URL: https://github.com/apache/kafka/pull/15619

   Implementation of Kip 1024. We change the `addGlobalStateStore` to be able 
to configure the way which restoring takes place or we let them use the default 
processor rather than a custom one.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-28 Thread via GitHub


chia7712 opened a new pull request, #15618:
URL: https://github.com/apache/kafka/pull/15618

   - add default implementation to Batch to return offset of max timestamp
   - copy ListOffsetsIntegrationTest from trunk branch
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-28 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2025522451

   hi @lucasbru - Let me address Lianets comment in this PR and have a 
separated PR for the behavior inconsistency as it does require some changes to 
the unit test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]

2024-03-28 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1543177295


##
server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+
+public class AclEntry extends AccessControlEntry {
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static final KafkaPrincipal WILDCARD_PRINCIPAL = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
+public static final String WILDCARD_PRINCIPAL_STRING = 
WILDCARD_PRINCIPAL.toString();
+public static final String WILDCARD_HOST = "*";
+public static final String WILDCARD_RESOURCE = 
ResourcePattern.WILDCARD_RESOURCE;
+public static final String RESOURCE_SEPARATOR = ":";
+public static final Set RESOURCE_TYPES = 
Arrays.stream(ResourceType.values())
+.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY))
+.collect(Collectors.toSet());
+public static final Set ACL_OPERATIONS = 
Arrays.stream(AclOperation.values())
+.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY))
+.collect(Collectors.toSet());
+
+private static final String PRINCIPAL_KEY = "principal";
+private static final String PERMISSION_TYPE_KEY = "permissionType";
+private static final String OPERATION_KEY = "operation";
+private static final String HOSTS_KEY = "host";
+public static final String VERSION_KEY = "version";
+public static final int CURRENT_VERSION = 1;
+private static final String ACLS_KEY = "acls";
+
+public final AccessControlEntry ace;
+public final KafkaPrincipal kafkaPrincipal;
+
+public AclEntry(AccessControlEntry ace) {
+super(ace.principal(), ace.host(), ace.operation(), 
ace.permissionType());
+this.ace = ace;
+
+kafkaPrincipal = ace.principal() == null
+? null
+: SecurityUtils.parseKafkaPrincipal(ace.principal());
+}
+
+/**
+ * Parse JSON representation of ACLs
+ * @param bytes of acls json string
+ *
+ * 
+{
+"version": 1,
+"acls": [
+{
+"host":"host1",
+"permissionType": 

Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-28 Thread via GitHub


wcarlson5 merged PR #15414:
URL: https://github.com/apache/kafka/pull/15414


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-28 Thread via GitHub


mimaison commented on code in PR #15597:
URL: https://github.com/apache/kafka/pull/15597#discussion_r1543174303


##
core/src/main/scala/kafka/controller/PartitionStateMachine.scala:
##
@@ -486,7 +486,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
 } else {
   val (logConfigs, failed) = zkClient.getLogConfigs(
 partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) 
=> partition.topic }.toSet,
-config.originals()
+config.extractLogConfigMap

Review Comment:
   It looks like this breaks `testOfflinePartitionToOnlinePartitionTransition()`
   ```
   Wanted but not invoked:
   controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
   List(5),
   t-0,
   
(Leader:5,ISR:5,LeaderRecoveryState:RECOVERED,LeaderEpoch:1,ZkVersion:2,ControllerEpoch:50),
   ReplicaAssignment(replicas=5, addingReplicas=, removingReplicas=),
   false
   );
   -> at 
kafka.controller.AbstractControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(ControllerChannelManager.scala:421)
   
   However, there was exactly 1 interaction with this mock:
   controllerBrokerRequestBatch.newBatch();
   -> at 
kafka.controller.ZkPartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:158)
   at 
kafka.controller.AbstractControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(ControllerChannelManager.scala:421)
   at 
kafka.controller.PartitionStateMachineTest.testOfflinePartitionToOnlinePartitionTransition(PartitionStateMachineTest.scala:275)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove the client side assignor from the ConsumerGroupHeartbeat API [kafka]

2024-03-28 Thread via GitHub


jolshan commented on code in PR #14469:
URL: https://github.com/apache/kafka/pull/14469#discussion_r1543175309


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##
@@ -43,21 +43,6 @@
   "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },
 { "name": "ServerAssignor", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
   "about": "null if not used or if it didn't change since the last 
heartbeat; the server side assignor to use otherwise." },
-{ "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", 
"nullableVersions": "0+", "default": "null",

Review Comment:
   No worries! I think we can do a better job sharing this widely with the 
community. I will think about how that can be done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-28 Thread via GitHub


wcarlson5 commented on PR #15414:
URL: https://github.com/apache/kafka/pull/15414#issuecomment-2025500658

   Test configuration issues in the build is in `core:test' and not related to 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1543172408


##
server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+
+public class AclEntry extends AccessControlEntry {
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static final KafkaPrincipal WILDCARD_PRINCIPAL = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
+public static final String WILDCARD_PRINCIPAL_STRING = 
WILDCARD_PRINCIPAL.toString();
+public static final String WILDCARD_HOST = "*";
+public static final String WILDCARD_RESOURCE = 
ResourcePattern.WILDCARD_RESOURCE;
+public static final String RESOURCE_SEPARATOR = ":";
+public static final Set RESOURCE_TYPES = 
Arrays.stream(ResourceType.values())
+.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY))
+.collect(Collectors.toSet());
+public static final Set ACL_OPERATIONS = 
Arrays.stream(AclOperation.values())
+.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY))
+.collect(Collectors.toSet());
+
+private static final String PRINCIPAL_KEY = "principal";
+private static final String PERMISSION_TYPE_KEY = "permissionType";
+private static final String OPERATION_KEY = "operation";
+private static final String HOSTS_KEY = "host";
+public static final String VERSION_KEY = "version";
+public static final int CURRENT_VERSION = 1;
+private static final String ACLS_KEY = "acls";
+
+public final AccessControlEntry ace;
+public final KafkaPrincipal kafkaPrincipal;
+
+public AclEntry(AccessControlEntry ace) {
+super(ace.principal(), ace.host(), ace.operation(), 
ace.permissionType());
+this.ace = ace;
+
+kafkaPrincipal = ace.principal() == null
+? null
+: SecurityUtils.parseKafkaPrincipal(ace.principal());
+}
+
+/**
+ * Parse JSON representation of ACLs
+ * @param bytes of acls json string
+ *
+ * 
+{
+"version": 1,
+"acls": [
+{
+"host":"host1",
+"permissionType": 

Re: [PR] MINOR: Remove the client side assignor from the ConsumerGroupHeartbeat API [kafka]

2024-03-28 Thread via GitHub


aiven-anton commented on code in PR #14469:
URL: https://github.com/apache/kafka/pull/14469#discussion_r1543168871


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##
@@ -43,21 +43,6 @@
   "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },
 { "name": "ServerAssignor", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
   "about": "null if not used or if it didn't change since the last 
heartbeat; the server side assignor to use otherwise." },
-{ "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", 
"nullableVersions": "0+", "default": "null",

Review Comment:
   Oh my, now I feel dumb  Thanks for pointing this out!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove the client side assignor from the ConsumerGroupHeartbeat API [kafka]

2024-03-28 Thread via GitHub


jolshan commented on code in PR #14469:
URL: https://github.com/apache/kafka/pull/14469#discussion_r1543165416


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##
@@ -43,21 +43,6 @@
   "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },
 { "name": "ServerAssignor", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
   "about": "null if not used or if it didn't change since the last 
heartbeat; the server side assignor to use otherwise." },
-{ "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", 
"nullableVersions": "0+", "default": "null",

Review Comment:
   It is an annotation. See in the spec
   
   ```
   "latestVersionUnstable": true,
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >