Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2026690869 @lucasbru - Thanks for taking time reviewing this PR. This PR is ready for another pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1544067091 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -946,7 +956,7 @@ public void testOffsetsForTimesWithZeroTimeout() { @Test public void testWakeupCommitted() { consumer = newConsumer(); -final HashMap offsets = mockTopicPartitionOffset(); Review Comment: just cleaning up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2026641230 > (1) revert all offsetForMaxTimestamp to shallowOffsetMaxTimestamp (2) change/revert the implementation to set shallowOffsetMaxTimestamp accordingly. Do we need to revert all of them? the paths we had fixed works well now. 1) It seems to me adding comments for both "recover" and "follower" cases can remind readers that this `offsetOfMaxTimestampMs` is shallow. 2) or we can only rename `offsetForMaxTimestamp` back to `shallowOffsetMaxTimestamp` but we keep the implementation. @junrao WDYT? > (3) add tests for follower appends will complete it later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1544046500 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +latestTimestampSegment.log.batchesFrom(position.position).asScala + .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp) Review Comment: >We could stop after finding the first batch matching maxTimestamp. oh, sorry for neglect that. > Although it should be impossible, should we handle empty() case? just let `KafkaApis` return error response with `UNKNOWN` https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1146 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1544046037 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1141,21 +1141,27 @@ private Map beginningOrEndOffset(Collection timestampToSearch = partitions .stream() .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( timestampToSearch, -false, timer); -Map offsetAndTimestampMap = applicationEventHandler.addAndGet( + +// shortcut the request if the timeout is zero. +if (timeout.isZero()) { Review Comment: scratch off the previous comment - addAndGet actually doesn't. We will need to explicitly return an empty result. See the code change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16039: RecordHeaders supports the addAll method [kafka]
github-actions[bot] commented on PR #15034: URL: https://github.com/apache/kafka/pull/15034#issuecomment-2026574417 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]
showuon commented on code in PR #14760: URL: https://github.com/apache/kafka/pull/14760#discussion_r1544017066 ## core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala: ## @@ -386,6 +386,7 @@ class NodeToControllerRequestThread( if (response.authenticationException != null) { error(s"Request ${queueItem.request} failed due to authentication error with controller", response.authenticationException) Review Comment: Should we log: `Disconnecting the connection to the stale controller ${activeControllerAddress().map(_.idString).getOrElse("null")}` like what we did for `NOT_CONTROLLER` error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
showuon commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1544007618 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -74,25 +72,25 @@ protected final Map createRemoteLogMetadataTr return map; } -protected final Map createRemoteLogStorageClassToApiKeyMap() { -Map map = new HashMap<>(); -map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); -map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); -map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); -map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY); -return map; -} - public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { -Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); -if (apiKey == null) { -throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() - + " does not exist."); -} -@SuppressWarnings("unchecked") -ApiMessageAndVersion apiMessageAndVersion = remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata); +String className = remoteLogMetadata.getClass().getName(); +RemoteLogMetadataTransform metadataTransform; + +if(className.equals(RemoteLogSegmentMetadata.class.getName())) { +metadataTransform = new RemoteLogSegmentMetadataTransform(); +} else if (className.equals(RemoteLogSegmentMetadataUpdate.class.getName())) { +metadataTransform = new RemoteLogSegmentMetadataUpdateTransform(); +} else if (className.equals(RemotePartitionDeleteMetadata.class.getName())) { +metadataTransform = new RemotePartitionDeleteMetadataTransform(); +} else if (className.equals(RemoteLogSegmentMetadataSnapshot.class.getName())) { +metadataTransform = new RemoteLogSegmentMetadataSnapshotTransform(); Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16349) ShutdownableThread fails build by calling Exit with race condition
[ https://issues.apache.org/jira/browse/KAFKA-16349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-16349. - Fix Version/s: 3.8.0 Assignee: Greg Harris Resolution: Fixed > ShutdownableThread fails build by calling Exit with race condition > -- > > Key: KAFKA-16349 > URL: https://issues.apache.org/jira/browse/KAFKA-16349 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.8.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > Fix For: 3.8.0 > > > `ShutdownableThread` calls `Exit.exit()` when the thread's operation throws > FatalExitError. In normal operation, this calls System.exit, and exits the > process. In tests, the exit procedure is masked with Exit.setExitProcedure to > prevent tests that encounter a FatalExitError from crashing the test JVM. > Masking of exit procedures is usually done in BeforeEach/AfterEach > annotations, with the exit procedures cleaned up immediately after the test > finishes. If the body of the test creates a ShutdownableThread that outlives > the test, such as by omitting `ShutdownableThread#awaitShutdown`, by having > `ShutdownableThread#awaitShutdown` be interrupted by a test timeout, or by a > race condition between `Exit.resetExitProcedure` and `Exit.exit`, then > System.exit() can be called erroneously. > > {noformat} > // First, in the test thread: > Exit.setExitProcedure(...) > try { > new ShutdownableThread(...).start() > } finally { > Exit.resetExitProcedure() > } > // Second, in the ShutdownableThread: > try { > throw new FatalExitError(...) > } catch (FatalExitError e) { > Exit.exit(...) // Calls real System.exit() > }{noformat} > > This can be resolved by one of the following: > # Eliminate FatalExitError usages in code when setExitProcedure is in-use > # Eliminate the Exit.exit call from ShutdownableThread, and instead > propagate this error to another thread to handle without a race-condition > # Eliminate resetExitProcedure by refactoring Exit to be non-static > FatalExitError is in use in a small number of places, but may be difficult to > eliminate: > * FinalizedFeatureChangeListener > * InterBrokerSendThread > * TopicBasedRemoteLogMetadataManager > There are many other places where Exit is called from a background thread, > including some implementations of ShutdownableThread which don't use > FatalExitError. > The effect of this bug is that the build is flaky, as race > conditions/timeouts in tests can cause the gradle executors to exit with > status code 1, which has happened 26 times in the last 28 days. I have not > yet been able to confirm this bug is happening in other tests, but I do have > a deterministic reproduction case with the exact same symptoms: > {noformat} > Gradle Test Run :core:test > Gradle Test Executor 38 > ShutdownableThreadTest > > testShutdownWhenTestTimesOut(boolean) > > "testShutdownWhenTestTimesOut(boolean).isInterruptible=true" SKIPPED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':core:test'. > > Process 'Gradle Test Executor 38' finished with non-zero exit value 1 > This problem might be caused by incorrect test process configuration. > For more on test execution, please refer to > https://docs.gradle.org/8.6/userguide/java_testing.html#sec:test_execution in > the Gradle documentation.{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16349: Prevent race conditions in Exit class from stopping test JVM [kafka]
gharris1727 merged PR #15484: URL: https://github.com/apache/kafka/pull/15484 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16349: Prevent race conditions in Exit class from stopping test JVM [kafka]
gharris1727 commented on PR #15484: URL: https://github.com/apache/kafka/pull/15484#issuecomment-2026540118 Test failures appear unrelated, and I got a local run of `./gradlew test` to pass! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
showuon commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1543916425 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +latestTimestampSegment.log.batchesFrom(position.position).asScala + .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp) Review Comment: Although it should be impossible, should we handle `empty()` case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on PR #15489: URL: https://github.com/apache/kafka/pull/15489#issuecomment-2026412628 > adde167 No prob. Added back `ToString` to troubleshoot -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
showuon commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1543906755 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +latestTimestampSegment.log.batchesFrom(position.position).asScala + .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp) Review Comment: Good suggestion. We should use `find` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Log slow controller events [kafka]
soarez commented on PR #15622: URL: https://github.com/apache/kafka/pull/15622#issuecomment-2026356492 > events have relatively unique names (we include offset in some) so the cardinality is quite high I see. What I meant would result in too many more metrics. Makes sense then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Log slow controller events [kafka]
mumrah commented on code in PR #15622: URL: https://github.com/apache/kafka/pull/15622#discussion_r1543844228 ## metadata/src/main/java/org/apache/kafka/controller/metrics/SlowEventsLogger.java: ## @@ -0,0 +1,58 @@ +package org.apache.kafka.controller.metrics; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +public class SlowEventsLogger { +/** + * Don't report any p99 events below this threshold. This prevents the controller from reporting p99 event + * times in the idle case. + */ +private static final int MIN_SLOW_EVENT_TIME_MS = 100; + +/** + * Calculating the p99 from the histogram consumes some resources, so only update it every so often. + */ +private static final int P99_REFRESH_INTERVAL_MS = 3; + +private final Supplier p99Supplier; +private final Logger log; +private final Time time; +private double p99; +private Timer percentileUpdateTimer; + +public SlowEventsLogger( +Supplier p99Supplier, +Logger log, +Time time +) { +this.p99Supplier = p99Supplier; +this.log = log; +this.time = time; +this.percentileUpdateTimer = time.timer(P99_REFRESH_INTERVAL_MS); +} + +public void maybeLog(String name, long durationNs) { +long durationMs = MILLISECONDS.convert(durationNs, NANOSECONDS); +if (durationMs > MIN_SLOW_EVENT_TIME_MS && durationMs > p99) { Review Comment: Yea, i need to initialize it in the constructor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Log slow controller events [kafka]
mumrah commented on PR #15622: URL: https://github.com/apache/kafka/pull/15622#issuecomment-2026321417 @soarez wdym by tagging here? Note that the events have relatively unique names (we include offset in some) so the cardinality is quite high. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Log slow controller events [kafka]
soarez commented on code in PR #15622: URL: https://github.com/apache/kafka/pull/15622#discussion_r1543829135 ## metadata/src/main/java/org/apache/kafka/controller/metrics/SlowEventsLogger.java: ## @@ -0,0 +1,58 @@ +package org.apache.kafka.controller.metrics; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +public class SlowEventsLogger { +/** + * Don't report any p99 events below this threshold. This prevents the controller from reporting p99 event + * times in the idle case. + */ +private static final int MIN_SLOW_EVENT_TIME_MS = 100; + +/** + * Calculating the p99 from the histogram consumes some resources, so only update it every so often. + */ +private static final int P99_REFRESH_INTERVAL_MS = 3; + +private final Supplier p99Supplier; +private final Logger log; +private final Time time; +private double p99; +private Timer percentileUpdateTimer; + +public SlowEventsLogger( +Supplier p99Supplier, +Logger log, +Time time +) { +this.p99Supplier = p99Supplier; +this.log = log; +this.time = time; +this.percentileUpdateTimer = time.timer(P99_REFRESH_INTERVAL_MS); +} + +public void maybeLog(String name, long durationNs) { +long durationMs = MILLISECONDS.convert(durationNs, NANOSECONDS); +if (durationMs > MIN_SLOW_EVENT_TIME_MS && durationMs > p99) { Review Comment: It seems possible that `p99 = p99Supplier.get();` may not have run yet at this point. Should `p99` have an explicit default value? ## metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java: ## @@ -28,6 +28,8 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; Review Comment: Is `Supplier` needed here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
soarez commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1543811354 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -74,25 +72,25 @@ protected final Map createRemoteLogMetadataTr return map; } -protected final Map createRemoteLogStorageClassToApiKeyMap() { -Map map = new HashMap<>(); -map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); -map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); -map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); -map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY); -return map; -} - public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { -Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); -if (apiKey == null) { -throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() - + " does not exist."); -} -@SuppressWarnings("unchecked") -ApiMessageAndVersion apiMessageAndVersion = remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata); +String className = remoteLogMetadata.getClass().getName(); +RemoteLogMetadataTransform metadataTransform; + +if(className.equals(RemoteLogSegmentMetadata.class.getName())) { +metadataTransform = new RemoteLogSegmentMetadataTransform(); +} else if (className.equals(RemoteLogSegmentMetadataUpdate.class.getName())) { +metadataTransform = new RemoteLogSegmentMetadataUpdateTransform(); +} else if (className.equals(RemotePartitionDeleteMetadata.class.getName())) { +metadataTransform = new RemotePartitionDeleteMetadataTransform(); +} else if (className.equals(RemoteLogSegmentMetadataSnapshot.class.getName())) { +metadataTransform = new RemoteLogSegmentMetadataSnapshotTransform(); Review Comment: Instead of comparing `remoteLogMetadata.getClass().getName()` with the various `*.class.getName()`, can we instead just compare the classes directly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]
soarez commented on PR #14760: URL: https://github.com/apache/kafka/pull/14760#issuecomment-2026260420 This still needs a review from a committer. @mimaison @mumrah @rondagostino @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15955) Migrating ZK brokers send dir assignments
[ https://issues.apache.org/jira/browse/KAFKA-15955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-15955: Priority: Minor (was: Major) > Migrating ZK brokers send dir assignments > - > > Key: KAFKA-15955 > URL: https://issues.apache.org/jira/browse/KAFKA-15955 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Assignee: Proven Provenzano >Priority: Minor > > Broker in ZooKeeper mode, while in migration mode, should start sending > directory assignments to the KRaft Controller using AssignmentsManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Extracted the creation of ReplicaManager in BrokerServer [kafka]
soarez commented on PR #15617: URL: https://github.com/apache/kafka/pull/15617#issuecomment-2026228502 Olá Filipe. Thanks for your contribution. `BrokerServer` shouldn't aim to follow the structure or interface in `KafkaServer`. What those two are meant to have in common is defined in `KafkaBroker`. So on its own, I don't think this change makes much sense. If you have a test that's going to make use of this, perhaps you could bundle those two changes in the same PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16286) KRaft doesn't always notify listener of latest leader
[ https://issues.apache.org/jira/browse/KAFKA-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio resolved KAFKA-16286. Resolution: Fixed > KRaft doesn't always notify listener of latest leader > - > > Key: KAFKA-16286 > URL: https://issues.apache.org/jira/browse/KAFKA-16286 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.8.0 > > > If a listener registers with RaftClient after the KRaft replica has > transition to follower it will not get notified of the current leader until > it has transitioned to another state. > In a stable cluster the listeners that are not the leader (inactive > controllers and brokers) will only get notified when then leader changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16286) KRaft doesn't always notify listener of latest leader
[ https://issues.apache.org/jira/browse/KAFKA-16286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-16286: --- Fix Version/s: 3.8.0 > KRaft doesn't always notify listener of latest leader > - > > Key: KAFKA-16286 > URL: https://issues.apache.org/jira/browse/KAFKA-16286 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.8.0 > > > If a listener registers with RaftClient after the KRaft replica has > transition to follower it will not get notified of the current leader until > it has transitioned to another state. > In a stable cluster the listeners that are not the leader (inactive > controllers and brokers) will only get notified when then leader changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]
soarez commented on PR #15607: URL: https://github.com/apache/kafka/pull/15607#issuecomment-2026208622 @mjsax I think this one should also be tagged 'streams' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove redundant ApiVersionsResponse#filterApis [kafka]
soarez commented on PR #15611: URL: https://github.com/apache/kafka/pull/15611#issuecomment-2026205805 Seems like a gray area to me. Regardless, I'd like to encourage @brandboat to keep looking for opportunities to cleanup and refactor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
soarez commented on code in PR #15605: URL: https://github.com/apache/kafka/pull/15605#discussion_r1543761783 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,22 +38,32 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + class ErrorQueue(initialErrorCounts: Errors*) { +private val queue: mutable.Queue[Errors] = mutable.Queue.empty ++ initialErrorCounts Review Comment: It think this can just be ```suggestion private val queue: mutable.Queue[Errors] = mutable.Queue(initialErrorCounts: _*) ``` ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,22 +38,32 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + class ErrorQueue(initialErrorCounts: Errors*) { +private val queue: mutable.Queue[Errors] = mutable.Queue.empty ++ initialErrorCounts + +def takeError(): Errors = queue.synchronized { + if (queue.isEmpty) { +return Errors.NONE + } + queue.dequeue() +} Review Comment: Now that this class is simple enough, could its use be replaced with `java.util.concurrent.ConcurrentLinkedQueue`? ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,22 +38,32 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + class ErrorQueue(initialErrorCounts: Errors*) { Review Comment: Now that you've removed the error counts, this variable name - `initialErrorCounts ` - no longer makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1543754297 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +latestTimestampSegment.log.batchesFrom(position.position).asScala + .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp) Review Comment: We could stop after finding the first batch matching maxTimestamp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16446) Log slow controller events
David Arthur created KAFKA-16446: Summary: Log slow controller events Key: KAFKA-16446 URL: https://issues.apache.org/jira/browse/KAFKA-16446 Project: Kafka Issue Type: Improvement Reporter: David Arthur Occasionally, we will see very high p99 controller event processing times. Unless DEBUG logs are enabled, it is impossible to see which events are slow. Typically this happens during controller startup/failover, though it can also happen sporadically when the controller gets overloaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Log slow controller events [kafka]
mumrah opened a new pull request, #15622: URL: https://github.com/apache/kafka/pull/15622 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore
[ https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831966#comment-17831966 ] Chia-Ping Tsai commented on KAFKA-16310: see the following link to check the revert from 3.7 https://github.com/apache/kafka/commit/bd5989dd195d42c1608582316367a03b2c78cb11 https://github.com/apache/kafka/commit/fc646f920701b792eb683bacd513a1f20909f6bc > ListOffsets doesn't report the offset with maxTimestamp anymore > --- > > Key: KAFKA-16310 > URL: https://issues.apache.org/jira/browse/KAFKA-16310 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Emanuele Sabellico >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 3.8.0 > > > Updated: This is confirmed a regression issue in v3.7.0. > The impact of this issue is that when there is a batch containing records > with timestamp not in order, the offset of the timestamp will be wrong.(ex: > the timestamp for t0 should be mapping to offset 10, but will get offset 12.. > etc). It'll cause the time index is putting the wrong offset, so the result > will be unexpected. > === > The last offset is reported instead. > A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking > that the offset with the max timestamp is the middle one and not the last > one. The tests is passing with 3.6.0 and previous versions > This is the test: > [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989] > > there are three messages, with timestamps: > {noformat} > t0 + 100 > t0 + 400 > t0 + 250{noformat} > and indices 0,1,2. > then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done. > it should return offset 1 but in 3.7.0 and trunk is returning offset 2 > Even after 5 seconds from producing it's still returning 2 as the offset with > max timestamp. > ProduceRequest and ListOffsets were sent to the same broker (2), the leader > didn't change. > {code:java} > %7|1709134230.019|SEND|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, > 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse > (v7, 95 bytes, CorrId 2, rtt 1.18ms) > %7|1709134230.020|MSGSET|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: > rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 > message(s) (MsgId 0, BaseSeq -1) delivered {code} > {code:java} > %7|1709134235.021|SEND|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest > (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received > ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16342) Fix compressed records
[ https://issues.apache.org/jira/browse/KAFKA-16342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16342: --- Fix Version/s: (was: 3.7.1) > Fix compressed records > -- > > Key: KAFKA-16342 > URL: https://issues.apache.org/jira/browse/KAFKA-16342 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.7.0, 3.6.1 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16341) Fix un-compressed records
[ https://issues.apache.org/jira/browse/KAFKA-16341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16341: --- Fix Version/s: (was: 3.7.1) > Fix un-compressed records > - > > Key: KAFKA-16341 > URL: https://issues.apache.org/jira/browse/KAFKA-16341 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.7.0, 3.6.1 >Reporter: Luke Chen >Assignee: Johnny Hsu >Priority: Major > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore
[ https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16310: --- Fix Version/s: (was: 3.7.1) > ListOffsets doesn't report the offset with maxTimestamp anymore > --- > > Key: KAFKA-16310 > URL: https://issues.apache.org/jira/browse/KAFKA-16310 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Emanuele Sabellico >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 3.8.0 > > > Updated: This is confirmed a regression issue in v3.7.0. > The impact of this issue is that when there is a batch containing records > with timestamp not in order, the offset of the timestamp will be wrong.(ex: > the timestamp for t0 should be mapping to offset 10, but will get offset 12.. > etc). It'll cause the time index is putting the wrong offset, so the result > will be unexpected. > === > The last offset is reported instead. > A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking > that the offset with the max timestamp is the middle one and not the last > one. The tests is passing with 3.6.0 and previous versions > This is the test: > [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989] > > there are three messages, with timestamps: > {noformat} > t0 + 100 > t0 + 400 > t0 + 250{noformat} > and indices 0,1,2. > then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done. > it should return offset 1 but in 3.7.0 and trunk is returning offset 2 > Even after 5 seconds from producing it's still returning 2 as the offset with > max timestamp. > ProduceRequest and ListOffsets were sent to the same broker (2), the leader > didn't change. > {code:java} > %7|1709134230.019|SEND|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, > 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse > (v7, 95 bytes, CorrId 2, rtt 1.18ms) > %7|1709134230.020|MSGSET|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: > rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 > message(s) (MsgId 0, BaseSeq -1) delivered {code} > {code:java} > %7|1709134235.021|SEND|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest > (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received > ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore
[ https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831965#comment-17831965 ] Chia-Ping Tsai commented on KAFKA-16310: {quote} Since the follower only maintains offsetForMaxTimestamp at the batch level, the listMaxTimestamp API was never implemented correctly. So, technically, there was no regression for listMaxTimestamp. We could just fix this issue in trunk without backporting to the old branch. {quote} I will revert KAFKA-16341 and KAFKA-16342 from 3.7 > ListOffsets doesn't report the offset with maxTimestamp anymore > --- > > Key: KAFKA-16310 > URL: https://issues.apache.org/jira/browse/KAFKA-16310 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Emanuele Sabellico >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Updated: This is confirmed a regression issue in v3.7.0. > The impact of this issue is that when there is a batch containing records > with timestamp not in order, the offset of the timestamp will be wrong.(ex: > the timestamp for t0 should be mapping to offset 10, but will get offset 12.. > etc). It'll cause the time index is putting the wrong offset, so the result > will be unexpected. > === > The last offset is reported instead. > A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking > that the offset with the max timestamp is the middle one and not the last > one. The tests is passing with 3.6.0 and previous versions > This is the test: > [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989] > > there are three messages, with timestamps: > {noformat} > t0 + 100 > t0 + 400 > t0 + 250{noformat} > and indices 0,1,2. > then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done. > it should return offset 1 but in 3.7.0 and trunk is returning offset 2 > Even after 5 seconds from producing it's still returning 2 as the offset with > max timestamp. > ProduceRequest and ListOffsets were sent to the same broker (2), the leader > didn't change. > {code:java} > %7|1709134230.019|SEND|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, > 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse > (v7, 95 bytes, CorrId 2, rtt 1.18ms) > %7|1709134230.020|MSGSET|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: > rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 > message(s) (MsgId 0, BaseSeq -1) delivered {code} > {code:java} > %7|1709134235.021|SEND|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest > (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received > ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 opened a new pull request, #15621: URL: https://github.com/apache/kafka/pull/15621 We do iterate the records to find the `offsetOfMaxTimestamp` instead of returning the cached one when handling `ListOffsetsRequest.MAX_TIMESTAMP`, since it is hard to align all paths to get correct `offsetOfMaxTimestamp`. The known paths are shown below. 1. `convertAndAssignOffsetsNonCompressed` -> we CAN get correct `offsetOfMaxTimestamp` when validating all records 2. `assignOffsetsNonCompressed` -> ditto 3. `validateMessagesAndAssignOffsetsCompressed` -> ditto 4. `validateMessagesAndAssignOffsetsCompressed#buildRecordsAndAssignOffsets` -> ditto 5. `appendAsFollow#append#analyzeAndValidateRecords` -> we CAN'T get correct `offsetOfMaxTimestamp` as iterating all records is expensive when fetching records from leader 6. `LogSegment#recover` -> ditto https://issues.apache.org/jira/browse/KAFKA-16310 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 merged PR #15597: URL: https://github.com/apache/kafka/pull/15597 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 commented on PR #15597: URL: https://github.com/apache/kafka/pull/15597#issuecomment-2026050399 Test failures appear unrelated: 1. ReplicaManagerTest has tickets [KAFKA-16323](https://issues.apache.org/jira/browse/KAFKA-16323) and [KAFKA-13530](https://issues.apache.org/jira/browse/KAFKA-13530) 2. ControllerRegistrationManagerTest has ticket [KAFKA-15897](https://issues.apache.org/jira/browse/KAFKA-15897) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore
[ https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831959#comment-17831959 ] Jun Rao commented on KAFKA-16310: - Since the follower only maintains offsetForMaxTimestamp at the batch level, the listMaxTimestamp API was never implemented correctly. So, technically, there was no regression for listMaxTimestamp. We could just fix this issue in trunk without backporting to the old branch. > ListOffsets doesn't report the offset with maxTimestamp anymore > --- > > Key: KAFKA-16310 > URL: https://issues.apache.org/jira/browse/KAFKA-16310 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Emanuele Sabellico >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Updated: This is confirmed a regression issue in v3.7.0. > The impact of this issue is that when there is a batch containing records > with timestamp not in order, the offset of the timestamp will be wrong.(ex: > the timestamp for t0 should be mapping to offset 10, but will get offset 12.. > etc). It'll cause the time index is putting the wrong offset, so the result > will be unexpected. > === > The last offset is reported instead. > A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking > that the offset with the max timestamp is the middle one and not the last > one. The tests is passing with 3.6.0 and previous versions > This is the test: > [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989] > > there are three messages, with timestamps: > {noformat} > t0 + 100 > t0 + 400 > t0 + 250{noformat} > and indices 0,1,2. > then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done. > it should return offset 1 but in 3.7.0 and trunk is returning offset 2 > Even after 5 seconds from producing it's still returning 2 as the offset with > max timestamp. > ProduceRequest and ListOffsets were sent to the same broker (2), the leader > didn't change. > {code:java} > %7|1709134230.019|SEND|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, > 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse > (v7, 95 bytes, CorrId 2, rtt 1.18ms) > %7|1709134230.020|MSGSET|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: > rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 > message(s) (MsgId 0, BaseSeq -1) delivered {code} > {code:java} > %7|1709134235.021|SEND|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest > (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received > ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15618: URL: https://github.com/apache/kafka/pull/15618#issuecomment-2026013673 > I am just saying that we only need to fix this in trunk since the implementation was never correct in any previous branches, thus not a regression. got it. will open another PR for trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 closed pull request #15618: KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… URL: https://github.com/apache/kafka/pull/15618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
gharris1727 commented on PR #15620: URL: https://github.com/apache/kafka/pull/15620#issuecomment-2026008220 @linu-shibu Could you fix the rawtypes warnings in this class? You can see them if you make this change in build.gradle: ```diff options.encoding = 'UTF-8' options.compilerArgs << "-Xlint:all" // temporary exclusions until all the warnings are fixed -if (!project.path.startsWith(":connect")) +if (!project.path.startsWith(":connect") && !project.path.startsWith(":storage")) options.compilerArgs << "-Xlint:-rawtypes" options.compilerArgs << "-Xlint:-serial" options.compilerArgs << "-Xlint:-try" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15618: URL: https://github.com/apache/kafka/pull/15618#issuecomment-2026000277 > I am not sure I understand this. All we need for this solution (or workaround) is the "max timestamp" of a segment, since we always iterate the batches (from the segment having the max timestamp) to find the "offset" of max timestamp when handling the ListOffsetsRequest.MAX_TIMESTAMP. Hence, we can correct the implement for all active branches (include 3.6.3) by this PR. Yes, I agree that we can fix this issue completely. I am just saying that we only need to fix this in trunk since the implementation was never correct in any previous branches, thus not a regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15618: URL: https://github.com/apache/kafka/pull/15618#discussion_r1543560539 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, +val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp()) Review Comment: Yes, suppose you have a 1GB segment and the maxTimestamp is in the last batch. latestTimestampSegment.log.batches() needs to read 1GB from disk. Using the offsetIndex, we only need to read the index and the index.interval (default to 4KB) worth of bytes. > Is the impl of lookup like this? Yes, that's what I was thinking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15618: URL: https://github.com/apache/kafka/pull/15618#issuecomment-2025943523 > Since the follower only maintains offsetForMaxTimestamp at the batch level, the listMaxTimestamp API was never implemented correctly. I am not sure I understand this. All we need for this solution (or workaround) is the "max timestamp" of a segment, since we always iterate the batches (from the segment having the max timestamp) to find the "offset" of max timestamp when handling the `ListOffsetsRequest.MAX_TIMESTAMP`. Hence, we can correct the implement for all active branches (include 3.6.3) by this PR. > So, technically, there was no regression for listMaxTimestamp. It seems there is no need to fix this in the 3.6? We could just fix it in trunk. BTW, I'm ok to keep the behavior for 3.6 as it is not a kind of "regression". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on PR #15620: URL: https://github.com/apache/kafka/pull/15620#issuecomment-2025930412 @gharris1727 please do take a look and provide some guidance/feedback. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu opened a new pull request, #15620: URL: https://github.com/apache/kafka/pull/15620 RemoteLogMetadata object, and has to dispatch to one of four serializers depending on it's type which is currently done by taking the class name of the RemoteLogMetadata and looking it up in maps to find the corresponding serializer for that class. This later requires an unchecked cast, because the RemoteLogMetadataTransform is generic. This is replaced by if-elseif-else statements which are type-safe. Map lookup is also removed in this new implementation which might make process faster as mentioned in the ticket. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15618: URL: https://github.com/apache/kafka/pull/15618#discussion_r1543505409 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, +val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp()) Review Comment: > latestTimestampSegment.log.batches() scans the whole log segment and could introduce unnecessary extra I/O. So, there could be performance degradation because of that. The `batches` is a `iterable` object, and its implementation load the batch only if we call `next`. https://github.com/apache/kafka/blob/3.6/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java#L63 Hence, the benefit of looking up for a batch (find the position and then use it to call `batchesFrom`) is that we can save some I/O by skipping some batches. Please correct me if I misunderstand anything. > I am not sure I understand this. Looking up for a batch with each baseOffset or lastOffset will locate the same batch using the offset index, right? Is the impl of lookup like this? ```scala val position = latestTimestampSegment.offsetIndex.lookup(latestTimestampSegment.offsetOfMaxTimestampSoFar) latestTimestampSegment.log.batchesFrom(position.position).asScala ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response in the above while loop, then getting it while the super.close waits, so we should trigger the callbacks at this point. But this makes me notice, aren't we breaking the `close(Duration)` contract here, calling that `super.close(timer)` on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139). Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response in the above while loop, then getting it while the super.close waits, so we should trigger the callbacks at this point. But this makes me notice, aren't we breaking the `close(Duration)` contract here, calling that `super.close(timer)` on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139) . Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response in the above while loop, then getting it while the super.close waits, so we trigger the callbacks. But this makes me notice, aren't we breaking the `close(Duration)` contract here, calling that `super.close(timer)` on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139) . Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response in the above while loop, then getting it while the super.close waits, so we trigger the callbacks. But this makes me notice, aren't we breaking the close(Duration) contract here, calling that super.close(timer) on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139) . Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it. This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it will be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16445) PATCH method for connector configuration
[ https://issues.apache.org/jira/browse/KAFKA-16445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Yurchenko updated KAFKA-16445: --- Summary: PATCH method for connector configuration (was: PATCH method for connecto configuration) > PATCH method for connector configuration > > > Key: KAFKA-16445 > URL: https://issues.apache.org/jira/browse/KAFKA-16445 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Ivan Yurchenko >Assignee: Ivan Yurchenko >Priority: Minor > > As [KIP-477: Add PATCH method for connector config in Connect REST > API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API] > suggests, we should introduce the PATCH method for connector configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16445) PATCH method for connecto configuration
Ivan Yurchenko created KAFKA-16445: -- Summary: PATCH method for connecto configuration Key: KAFKA-16445 URL: https://issues.apache.org/jira/browse/KAFKA-16445 Project: Kafka Issue Type: Improvement Components: connect Reporter: Ivan Yurchenko Assignee: Ivan Yurchenko As [KIP-477: Add PATCH method for connector config in Connect REST API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API] suggests, we should introduce the PATCH method for connector configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15618: URL: https://github.com/apache/kafka/pull/15618#issuecomment-2025853565 @chia7712: Since the follower only maintains offsetForMaxTimestamp at the batch level, the listMaxTimestamp API was never implemented correctly. So, technically, there was no regression for listMaxTimestamp. It seems there is no need to fix this in the 3.6? We could just fix it in trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15618: URL: https://github.com/apache/kafka/pull/15618#discussion_r1543430656 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, +val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp()) Review Comment: @chia7712 :` latestTimestampSegment.log.batches()` scans the whole log segment and could introduce unnecessary extra I/O. So, there could be performance degradation because of that. > Hence we have to use condition baseOffset <= offset <= lastOffset to find batch. I am not sure I understand this. Looking up for a batch with each baseOffset or lastOffset will locate the same batch using the offset index, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIp] Fixes consume bench test.py maybe [kafka]
kirktrue closed pull request #15427: [WIp] Fixes consume bench test.py maybe URL: https://github.com/apache/kafka/pull/15427 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue closed pull request #15250: KAFKA-15974: Enforce that event processing respects user-provided timeout URL: https://github.com/apache/kafka/pull/15250 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16444) Run KIP-848 unit tests under code coverage
Kirk True created KAFKA-16444: - Summary: Run KIP-848 unit tests under code coverage Key: KAFKA-16444 URL: https://issues.apache.org/jira/browse/KAFKA-16444 Project: Kafka Issue Type: Task Components: clients, consumer, unit tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16442) Update streams_standby_replica_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16442: -- Component/s: streams > Update streams_standby_replica_test.py to support KIP-848’s group protocol > config > - > > Key: KAFKA-16442 > URL: https://issues.apache.org/jira/browse/KAFKA-16442 > Project: Kafka > Issue Type: Test > Components: clients, consumer, streams, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > This task is to update the test method(s) in > {{streams_standby_replica_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16443) Update streams_static_membership_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16443: -- Component/s: streams > Update streams_static_membership_test.py to support KIP-848’s group protocol > config > --- > > Key: KAFKA-16443 > URL: https://issues.apache.org/jira/browse/KAFKA-16443 > Project: Kafka > Issue Type: Test > Components: clients, consumer, streams, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > This task is to update the test method(s) in > {{streams_static_membership_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16441) Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16441: -- Component/s: streams > Update streams_broker_down_resilience_test.py to support KIP-848’s group > protocol config > > > Key: KAFKA-16441 > URL: https://issues.apache.org/jira/browse/KAFKA-16441 > Project: Kafka > Issue Type: Test > Components: clients, consumer, streams, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > This task is to update the test method(s) in > {{streams_broker_down_resilience_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16441) Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16441: -- Description: This task is to update the test method(s) in {{streams_broker_down_resilience_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was: This task is to update the test method(s) in {{security_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. > Update streams_broker_down_resilience_test.py to support KIP-848’s group > protocol config > > > Key: KAFKA-16441 > URL: https://issues.apache.org/jira/browse/KAFKA-16441 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > This task is to update the test method(s) in > {{streams_broker_down_resilience_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16443) Update streams_static_membership_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16443: -- Description: This task is to update the test method(s) in {{streams_static_membership_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was: This task is to update the test method(s) in {{streams_standby_replica_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. > Update streams_static_membership_test.py to support KIP-848’s group protocol > config > --- > > Key: KAFKA-16443 > URL: https://issues.apache.org/jira/browse/KAFKA-16443 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > This task is to update the test method(s) in > {{streams_static_membership_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16443) Update streams_static_membership_test.py to support KIP-848’s group protocol config
Kirk True created KAFKA-16443: - Summary: Update streams_static_membership_test.py to support KIP-848’s group protocol config Key: KAFKA-16443 URL: https://issues.apache.org/jira/browse/KAFKA-16443 Project: Kafka Issue Type: Test Components: clients, consumer, system tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 4.0.0 This task is to update the test method(s) in {{streams_standby_replica_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16442) Update streams_standby_replica_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16442: -- Description: This task is to update the test method(s) in {{streams_standby_replica_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was: This task is to update the test method(s) in {{security_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. > Update streams_standby_replica_test.py to support KIP-848’s group protocol > config > - > > Key: KAFKA-16442 > URL: https://issues.apache.org/jira/browse/KAFKA-16442 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > This task is to update the test method(s) in > {{streams_standby_replica_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16442) Update streams_standby_replica_test.py to support KIP-848’s group protocol config
Kirk True created KAFKA-16442: - Summary: Update streams_standby_replica_test.py to support KIP-848’s group protocol config Key: KAFKA-16442 URL: https://issues.apache.org/jira/browse/KAFKA-16442 Project: Kafka Issue Type: Test Components: clients, consumer, system tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 4.0.0 This task is to update the test method(s) in {{security_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16441) Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16441: -- Fix Version/s: 4.0.0 (was: 3.8.0) > Update streams_broker_down_resilience_test.py to support KIP-848’s group > protocol config > > > Key: KAFKA-16441 > URL: https://issues.apache.org/jira/browse/KAFKA-16441 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > This task is to update the test method(s) in {{security_test.py}} to support > the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16441) Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config
Kirk True created KAFKA-16441: - Summary: Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config Key: KAFKA-16441 URL: https://issues.apache.org/jira/browse/KAFKA-16441 Project: Kafka Issue Type: Test Components: clients, consumer, system tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 This task is to update the test method(s) in {{security_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16440) Update security_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16440: -- Description: This task is to update the test method(s) in {{security_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was: This task is to update the test method(s) in {{replication_replica_failure_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. > Update security_test.py to support KIP-848’s group protocol config > -- > > Key: KAFKA-16440 > URL: https://issues.apache.org/jira/browse/KAFKA-16440 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{security_test.py}} to support > the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16440) Update security_test.py to support KIP-848’s group protocol config
Kirk True created KAFKA-16440: - Summary: Update security_test.py to support KIP-848’s group protocol config Key: KAFKA-16440 URL: https://issues.apache.org/jira/browse/KAFKA-16440 Project: Kafka Issue Type: Test Components: clients, consumer, system tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 This task is to update the test method(s) in {{replication_replica_failure_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16439) Update replication_replica_failure_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16439: -- Reviewer: (was: Lucas Brutschy) > Update replication_replica_failure_test.py to support KIP-848’s group > protocol config > - > > Key: KAFKA-16439 > URL: https://issues.apache.org/jira/browse/KAFKA-16439 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in > {{replication_replica_failure_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16439) Update replication_replica_failure_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16439: -- Description: This task is to update the test method(s) in {{replication_replica_failure_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was: This task is to update the test method(s) in {{replica_scale_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. > Update replication_replica_failure_test.py to support KIP-848’s group > protocol config > - > > Key: KAFKA-16439 > URL: https://issues.apache.org/jira/browse/KAFKA-16439 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in > {{replication_replica_failure_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15618: URL: https://github.com/apache/kafka/pull/15618#discussion_r1543390343 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, +val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp()) Review Comment: @junrao thanks for feedback. I use the max timestamp to find the "first" batch instead of using offset index. It seems to me using max timestamp is more simple since the offset stored by `maxTimestampAndOffsetSoFar` could be either the last offset or offset of max timestamp. Hence we have to use condition `baseOffset <= offset <= lastOffset` to find batch. I'm ok to use offset if using max timestamp to find first batch have any side effect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16439) Update replication_replica_failure_test.py to support KIP-848’s group protocol config
Kirk True created KAFKA-16439: - Summary: Update replication_replica_failure_test.py to support KIP-848’s group protocol config Key: KAFKA-16439 URL: https://issues.apache.org/jira/browse/KAFKA-16439 Project: Kafka Issue Type: Test Components: clients, consumer, system tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 This task is to update the test method(s) in {{replica_scale_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16438) Update consumer_test.py’s static tests to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16438: -- Issue Type: Test (was: Bug) > Update consumer_test.py’s static tests to support KIP-848’s group protocol > config > - > > Key: KAFKA-16438 > URL: https://issues.apache.org/jira/browse/KAFKA-16438 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the following test method(s) in {{consumer_test.py}} > to support the {{group.protocol}} configuration: > * {{test_fencing_static_consumer}} > * {{test_static_consumer_bounce}} > * {{test_static_consumer_persisted_after_rejoin}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16438) Update consumer_test.py’s static tests to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16438: -- Fix Version/s: 3.8.0 > Update consumer_test.py’s static tests to support KIP-848’s group protocol > config > - > > Key: KAFKA-16438 > URL: https://issues.apache.org/jira/browse/KAFKA-16438 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the following test method(s) in {{consumer_test.py}} > to support the {{group.protocol}} configuration: > * {{test_fencing_static_consumer}} > * {{test_static_consumer_bounce}} > * {{test_static_consumer_persisted_after_rejoin}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16438) Update consumer_test.py’s static tests to support KIP-848’s group protocol config
Kirk True created KAFKA-16438: - Summary: Update consumer_test.py’s static tests to support KIP-848’s group protocol config Key: KAFKA-16438 URL: https://issues.apache.org/jira/browse/KAFKA-16438 Project: Kafka Issue Type: Bug Components: clients, consumer, system tests Reporter: Kirk True Assignee: Kirk True This task is to update the following test method(s) in {{consumer_test.py}} to support the {{group.protocol}} configuration: * {{test_fencing_static_consumer}} * {{test_static_consumer_bounce}} * {{test_static_consumer_persisted_after_rejoin}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16275) Update kraft_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16275: -- Fix Version/s: 3.8.0 (was: 4.0.0) > Update kraft_upgrade_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16275 > URL: https://issues.apache.org/jira/browse/KAFKA-16275 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{kraft_upgrade_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16275) Update kraft_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16275: - Assignee: Kirk True > Update kraft_upgrade_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16275 > URL: https://issues.apache.org/jira/browse/KAFKA-16275 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{kraft_upgrade_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16271) Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-16271. --- Reviewer: Lucas Brutschy Resolution: Fixed > Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol > config > -- > > Key: KAFKA-16271 > URL: https://issues.apache.org/jira/browse/KAFKA-16271 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in > {{consumer_rolling_upgrade_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. > The tricky wrinkle here is that the existing test relies on client-side > assignment strategies that aren't applicable with the new KIP-848-enabled > consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
kirktrue commented on PR #15594: URL: https://github.com/apache/kafka/pull/15594#issuecomment-2025765078 @lucasbru—can you take a look at this system test change? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16275) Update kraft_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16275: -- Fix Version/s: 4.0.0 (was: 3.8.0) > Update kraft_upgrade_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16275 > URL: https://issues.apache.org/jira/browse/KAFKA-16275 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 4.0.0 > > > This task is to update the test method(s) in {{kraft_upgrade_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]
fvaleri commented on PR #15273: URL: https://github.com/apache/kafka/pull/15273#issuecomment-2025743216 Closing this one in favor of the work done for KAFKA-15853. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]
fvaleri closed pull request #15273: KAFKA-14585: Refactoring for moving the storage tool URL: https://github.com/apache/kafka/pull/15273 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15618: URL: https://github.com/apache/kafka/pull/15618#discussion_r1543349555 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, +val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp()) Review Comment: Hmm, iterating all batches can be expensive. We could use the offset index to find the batch containing the offset in maxTimestampAndOffsetSoFar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
mimaison commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1543199493 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +public class KafkaConfig { + +/** * Log Configuration ***/ +public final static String NUM_PARTITIONS_CONFIG = "num.partitions"; +public final static String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs"; +public final static String LOG_DIR_CONFIG = LOG_PREFIX + "dir"; +public final static String LOG_SEGMENT_BYTES_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG); + +public final static String LOG_ROLL_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG); +public final static String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + "roll.hours"; + +public final static String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG); +public final static String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX + "roll.jitter.hours"; + +public final static String LOG_RETENTION_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG); +public final static String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX + "retention.minutes"; +public final static String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + "retention.hours"; + +public final static String LOG_RETENTION_BYTES_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_BYTES_CONFIG); +public final static String LOG_CLEANUP_INTERVAL_MS_CONFIG = LOG_PREFIX + "retention.check.interval.ms"; +public final static String LOG_CLEANUP_POLICY_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.CLEANUP_POLICY_CONFIG); +public final static String LOG_INDEX_SIZE_MAX_BYTES_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG); +public final static String LOG_INDEX_INTERVAL_BYTES_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG); +public final static String LOG_FLUSH_INTERVAL_MESSAGES_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG); +public final static String LOG_DELETE_DELAY_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG); +public final static String LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG = LOG_PREFIX + "flush.scheduler.interval.ms"; +public final static String LOG_FLUSH_INTERVAL_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FLUSH_MS_CONFIG); +public final static String LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG = LOG_PREFIX + "flush.offset.checkpoint.interval.ms"; +public final static String LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG = LOG_PREFIX + "flush.start.offset.checkpoint.interval.ms"; +public final static String LOG_PRE_ALLOCATE_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.PREALLOCATE_CONFIG); + +/* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */ +/** + * @deprecated since "3.0" + */ +@Deprecated +public final static String LOG_MESSAGE_FORMAT_VERSION_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG); + +public final static String LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG); + +/* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details */ +/** + * @deprecated since "3.6" + */ +@Deprecated +public final static String LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG); + +public final static String LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG =
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on PR #14716: URL: https://github.com/apache/kafka/pull/14716#issuecomment-2025540351 Okay, enjoy your time off! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] 1024 impl [kafka]
wcarlson5 opened a new pull request, #15619: URL: https://github.com/apache/kafka/pull/15619 Implementation of Kip 1024. We change the `addGlobalStateStore` to be able to configure the way which restoring takes place or we let them use the default processor rather than a custom one. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 opened a new pull request, #15618: URL: https://github.com/apache/kafka/pull/15618 - add default implementation to Batch to return offset of max timestamp - copy ListOffsetsIntegrationTest from trunk branch ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2025522451 hi @lucasbru - Let me address Lianets comment in this PR and have a separated PR for the behavior inconsistency as it does require some changes to the unit test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]
nizhikov commented on code in PR #15572: URL: https://github.com/apache/kafka/pull/15572#discussion_r1543177295 ## server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.security.authorizer; + +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.acl.AclOperation.ALTER; +import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS; +import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION; +import static org.apache.kafka.common.acl.AclOperation.CREATE; +import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS; +import static org.apache.kafka.common.acl.AclOperation.DELETE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS; +import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE; +import static org.apache.kafka.common.acl.AclOperation.READ; +import static org.apache.kafka.common.acl.AclOperation.WRITE; + +public class AclEntry extends AccessControlEntry { +private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); +private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + +public static final KafkaPrincipal WILDCARD_PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"); +public static final String WILDCARD_PRINCIPAL_STRING = WILDCARD_PRINCIPAL.toString(); +public static final String WILDCARD_HOST = "*"; +public static final String WILDCARD_RESOURCE = ResourcePattern.WILDCARD_RESOURCE; +public static final String RESOURCE_SEPARATOR = ":"; +public static final Set RESOURCE_TYPES = Arrays.stream(ResourceType.values()) +.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY)) +.collect(Collectors.toSet()); +public static final Set ACL_OPERATIONS = Arrays.stream(AclOperation.values()) +.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY)) +.collect(Collectors.toSet()); + +private static final String PRINCIPAL_KEY = "principal"; +private static final String PERMISSION_TYPE_KEY = "permissionType"; +private static final String OPERATION_KEY = "operation"; +private static final String HOSTS_KEY = "host"; +public static final String VERSION_KEY = "version"; +public static final int CURRENT_VERSION = 1; +private static final String ACLS_KEY = "acls"; + +public final AccessControlEntry ace; +public final KafkaPrincipal kafkaPrincipal; + +public AclEntry(AccessControlEntry ace) { +super(ace.principal(), ace.host(), ace.operation(), ace.permissionType()); +this.ace = ace; + +kafkaPrincipal = ace.principal() == null +? null +: SecurityUtils.parseKafkaPrincipal(ace.principal()); +} + +/** + * Parse JSON representation of ACLs + * @param bytes of acls json string + * + * +{ +"version": 1, +"acls": [ +{ +"host":"host1", +"permissionType":
Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]
wcarlson5 merged PR #15414: URL: https://github.com/apache/kafka/pull/15414 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
mimaison commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1543174303 ## core/src/main/scala/kafka/controller/PartitionStateMachine.scala: ## @@ -486,7 +486,7 @@ class ZkPartitionStateMachine(config: KafkaConfig, } else { val (logConfigs, failed) = zkClient.getLogConfigs( partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet, -config.originals() +config.extractLogConfigMap Review Comment: It looks like this breaks `testOfflinePartitionToOnlinePartitionTransition()` ``` Wanted but not invoked: controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers( List(5), t-0, (Leader:5,ISR:5,LeaderRecoveryState:RECOVERED,LeaderEpoch:1,ZkVersion:2,ControllerEpoch:50), ReplicaAssignment(replicas=5, addingReplicas=, removingReplicas=), false ); -> at kafka.controller.AbstractControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(ControllerChannelManager.scala:421) However, there was exactly 1 interaction with this mock: controllerBrokerRequestBatch.newBatch(); -> at kafka.controller.ZkPartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:158) at kafka.controller.AbstractControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(ControllerChannelManager.scala:421) at kafka.controller.PartitionStateMachineTest.testOfflinePartitionToOnlinePartitionTransition(PartitionStateMachineTest.scala:275) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove the client side assignor from the ConsumerGroupHeartbeat API [kafka]
jolshan commented on code in PR #14469: URL: https://github.com/apache/kafka/pull/14469#discussion_r1543175309 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json: ## @@ -43,21 +43,6 @@ "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, -{ "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null", Review Comment: No worries! I think we can do a better job sharing this widely with the community. I will think about how that can be done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]
wcarlson5 commented on PR #15414: URL: https://github.com/apache/kafka/pull/15414#issuecomment-2025500658 Test configuration issues in the build is in `core:test' and not related to this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]
chia7712 commented on code in PR #15572: URL: https://github.com/apache/kafka/pull/15572#discussion_r1543172408 ## server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.security.authorizer; + +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.acl.AclOperation.ALTER; +import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS; +import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION; +import static org.apache.kafka.common.acl.AclOperation.CREATE; +import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS; +import static org.apache.kafka.common.acl.AclOperation.DELETE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS; +import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE; +import static org.apache.kafka.common.acl.AclOperation.READ; +import static org.apache.kafka.common.acl.AclOperation.WRITE; + +public class AclEntry extends AccessControlEntry { +private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); +private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + +public static final KafkaPrincipal WILDCARD_PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"); +public static final String WILDCARD_PRINCIPAL_STRING = WILDCARD_PRINCIPAL.toString(); +public static final String WILDCARD_HOST = "*"; +public static final String WILDCARD_RESOURCE = ResourcePattern.WILDCARD_RESOURCE; +public static final String RESOURCE_SEPARATOR = ":"; +public static final Set RESOURCE_TYPES = Arrays.stream(ResourceType.values()) +.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY)) +.collect(Collectors.toSet()); +public static final Set ACL_OPERATIONS = Arrays.stream(AclOperation.values()) +.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY)) +.collect(Collectors.toSet()); + +private static final String PRINCIPAL_KEY = "principal"; +private static final String PERMISSION_TYPE_KEY = "permissionType"; +private static final String OPERATION_KEY = "operation"; +private static final String HOSTS_KEY = "host"; +public static final String VERSION_KEY = "version"; +public static final int CURRENT_VERSION = 1; +private static final String ACLS_KEY = "acls"; + +public final AccessControlEntry ace; +public final KafkaPrincipal kafkaPrincipal; + +public AclEntry(AccessControlEntry ace) { +super(ace.principal(), ace.host(), ace.operation(), ace.permissionType()); +this.ace = ace; + +kafkaPrincipal = ace.principal() == null +? null +: SecurityUtils.parseKafkaPrincipal(ace.principal()); +} + +/** + * Parse JSON representation of ACLs + * @param bytes of acls json string + * + * +{ +"version": 1, +"acls": [ +{ +"host":"host1", +"permissionType":
Re: [PR] MINOR: Remove the client side assignor from the ConsumerGroupHeartbeat API [kafka]
aiven-anton commented on code in PR #14469: URL: https://github.com/apache/kafka/pull/14469#discussion_r1543168871 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json: ## @@ -43,21 +43,6 @@ "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, -{ "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null", Review Comment: Oh my, now I feel dumb Thanks for pointing this out! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove the client side assignor from the ConsumerGroupHeartbeat API [kafka]
jolshan commented on code in PR #14469: URL: https://github.com/apache/kafka/pull/14469#discussion_r1543165416 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json: ## @@ -43,21 +43,6 @@ "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, -{ "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null", Review Comment: It is an annotation. See in the spec ``` "latestVersionUnstable": true, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org