Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2029213022 rebase code and apply Luke's patch from https://github.com/chia7712/kafka/pull/3/files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15823) NodeToControllerChannelManager: authentication error prevents controller update
[ https://issues.apache.org/jira/browse/KAFKA-15823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-15823. --- Resolution: Fixed > NodeToControllerChannelManager: authentication error prevents controller > update > --- > > Key: KAFKA-15823 > URL: https://issues.apache.org/jira/browse/KAFKA-15823 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.0, 3.5.1 >Reporter: Gaurav Narula >Priority: Major > Fix For: 3.8.0 > > > NodeToControllerChannelManager caches the activeController address in an > AtomicReference which is updated when: > # activeController [has not been > set|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala#L422] > # networkClient [disconnnects from the > controller|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala#L395C7-L395C7] > # A node replies with > `[Errors.NOT_CONTROLLER|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala#L408]`, > and > # When a controller changes from [Zk mode to Kraft > mode|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala#L325] > > When running multiple Kafka clusters in a dynamic environment, there is a > chance that a controller's IP may get reassigned to another cluster's broker > when the controller is bounced. In this scenario, the requests from Node to > the Controller may fail with an AuthenticationException and are then retried > indefinitely. This causes the node to get stuck as the new controller's > information is never set. > > A potential fix would be disconnect the network client and invoke > `updateControllerAddress(null)` as we do in the `Errors.NOT_CONTROLLER` case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]
showuon merged PR #14760: URL: https://github.com/apache/kafka/pull/14760 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]
showuon commented on PR #14760: URL: https://github.com/apache/kafka/pull/14760#issuecomment-2029112501 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: enhance kafka-reassign-partitions command output [kafka]
showuon commented on PR #15610: URL: https://github.com/apache/kafka/pull/15610#issuecomment-2029071458 @AndrewJSchofield , Do you have any other comments? I'm going to merge this PR this week if no other comments. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16323) Failing test: fix testRemoteFetchExpiresPerSecMetric
[ https://issues.apache.org/jira/browse/KAFKA-16323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16323: -- Fix Version/s: (was: 3.7.1) > Failing test: fix testRemoteFetchExpiresPerSecMetric > - > > Key: KAFKA-16323 > URL: https://issues.apache.org/jira/browse/KAFKA-16323 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Johnny Hsu >Assignee: Johnny Hsu >Priority: Major > Labels: test-failure > Fix For: 3.8.0 > > > Refer to > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2685/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_21_and_Scala_2_13___testRemoteFetchExpiresPerSecMetric__/] > This test is failing, and this ticket aims to address this -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]
showuon merged PR #15463: URL: https://github.com/apache/kafka/pull/15463 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16323) Failing test: fix testRemoteFetchExpiresPerSecMetric
[ https://issues.apache.org/jira/browse/KAFKA-16323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16323. --- Fix Version/s: 3.8.0 3.7.1 Resolution: Fixed > Failing test: fix testRemoteFetchExpiresPerSecMetric > - > > Key: KAFKA-16323 > URL: https://issues.apache.org/jira/browse/KAFKA-16323 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Johnny Hsu >Assignee: Johnny Hsu >Priority: Major > Labels: test-failure > Fix For: 3.8.0, 3.7.1 > > > Refer to > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2685/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_21_and_Scala_2_13___testRemoteFetchExpiresPerSecMetric__/] > This test is failing, and this ticket aims to address this -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]
showuon commented on PR #15463: URL: https://github.com/apache/kafka/pull/15463#issuecomment-2029068424 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832665#comment-17832665 ] Kirk True commented on KAFKA-16272: --- [~sagarrao]—would you mind marking this as either in progress or patch available? Thanks! > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Sagar Rao >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16427) KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER
[ https://issues.apache.org/jira/browse/KAFKA-16427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16427: -- Priority: Critical (was: Minor) > KafkaConsumer#position() does not respect timeout when group protocol is > CONSUMER > - > > Key: KAFKA-16427 > URL: https://issues.apache.org/jira/browse/KAFKA-16427 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Alyssa Huang >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > When > `long position(TopicPartition partition, final Duration timeout);` > is called on an unknown topic partition (and auto creation is disabled), the > method fails to adhere to the timeout supplied. > e.g. the following warning is logged continuously as metadata fetches are > retried > [2024-03-26 11:03:48,589] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Error while fetching metadata with correlation id 200 : > \{nonexistingTopic=UNKNOWN_TOPIC_OR_PARTITION} > (org.apache.kafka.clients.NetworkClient:1313) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1545788943 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamRightJoin.java: ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import java.util.Optional; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamRightJoin extends KStreamKStreamJoin { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamRightJoin.class); + + +KStreamKStreamRightJoin(final String otherWindowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joiner, +final boolean outer, +final Optional outerJoinWindowName, +final TimeTrackerSupplier sharedTimeTrackerSupplier) { +super(otherWindowName, windows, windows.afterMs, windows.beforeMs, joiner, outerJoinWindowName, +sharedTimeTrackerSupplier, outer); +} + +@Override +public Processor get() { +return new KStreamKStreamRightJoinProcessor(); +} + +private class KStreamKStreamRightJoinProcessor extends KStreamKStreamJoinProcessor { +@Override +public void process(final Record rightRecord) { +final long inputRecordTimestamp = rightRecord.timestamp(); +final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); +final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); +sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); + +if (outer && rightRecord.key() == null && rightRecord.value() != null) { + context().forward(rightRecord.withValue(joiner.apply(rightRecord.key(), rightRecord.value(), null))); +return; +} else if (StreamStreamJoinUtil.skipRecord(rightRecord, LOG, droppedRecordsSensor, context())) { +return; +} + +// Emit all non-joined records which window has closed +if (inputRecordTimestamp == sharedTimeTracker.streamTime) { +rightOuterJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, rightRecord)); +} + +boolean needOuterJoin = outer; +// TODO: Where and when are the records put in this window store? +try (final WindowStoreIterator iter = otherWindowStore.fetch(rightRecord.key(), timeFrom, timeTo)) { +while (iter.hasNext()) { +needOuterJoin = false; +final KeyValue leftRecord = iter.next(); +final long leftRecordTimestamp = leftRecord.key; + +rightOuterJoinStore.ifPresent(store -> { +// use putIfAbsent to first read and see if there's any values for the key, +// if yes delete the key, otherwise do not issue a put; +// we may delete some values with the same key early but since we are going +// range over all values of the same key even after failure, since the other window-store +// is only cleaned up by stream time, so this is okay for at-least-once. +store.putIfAbsent( + TimestampedKeyAndJoinSide.makeLeftSide(rightRecord.key(), leftRecordTimestamp), null); +}); + +final VOut joinResult = joiner.apply(rightRecord.key(),
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1545727392 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -16,276 +16,98 @@ */ package org.apache.kafka.streams.kstream.internals; +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import java.util.Optional; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.ValueJoinerWithKey; import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; import org.apache.kafka.streams.processor.api.ContextualProcessor; -import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.LeftOrRightValue; import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Optional; - -import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; -import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; - -class KStreamKStreamJoin implements ProcessorSupplier { -private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); +abstract class KStreamKStreamJoin implements ProcessorSupplier { +protected final ValueJoinerWithKey joiner; +protected final long joinGraceMs; +protected final boolean outer; +protected final long joinBeforeMs; +protected final long joinAfterMs; private final String otherWindowName; -private final long joinBeforeMs; -private final long joinAfterMs; -private final long joinGraceMs; private final boolean enableSpuriousResultFix; private final long windowsBeforeMs; private final long windowsAfterMs; - -private final boolean outer; -private final boolean isLeftSide; private final Optional outerJoinWindowName; -private final ValueJoinerWithKey joiner; private final TimeTrackerSupplier sharedTimeTrackerSupplier; -KStreamKStreamJoin(final boolean isLeftSide, - final String otherWindowName, - final JoinWindowsInternal windows, - final ValueJoinerWithKey joiner, - final boolean outer, - final Optional outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { -this.isLeftSide = isLeftSide; +KStreamKStreamJoin(final String otherWindowName, +final JoinWindowsInternal windows, +final long joinBeforeMs, +final long joinAfterMs, +final ValueJoinerWithKey joiner, +final Optional outerJoinWindowName, +final TimeTrackerSupplier sharedTimeTrackerSupplier, +final boolean outer) { this.otherWindowName = otherWindowName; -if (isLeftSide) { -this.joinBeforeMs = windows.beforeMs; -this.joinAfterMs = windows.afterMs; -} else { -this.joinBeforeMs = windows.afterMs; -this.joinAfterMs = windows.beforeMs; -} this.windowsAfterMs = windows.afterMs; this.windowsBeforeMs = windows.beforeMs; -this.joinGraceMs = windows.gracePeriodMs(); +this.joinBeforeMs = joinBeforeMs; +this.joinAfterMs = joinAfterMs; this.enableSpuriousResultFix = windows.spuriousResultFixEnabled(); this.joiner = joiner; -this.outer = outer; +this.joinGraceMs = windows.gracePeriodMs(); this.outerJoinWindowName = outerJoinWindowName; this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; +this.outer = outer; } -@Override -public Processor get() { -return new KStreamKStreamJoinProcessor(); -} - -private class KStreamKStreamJoinProcessor extends ContextualProcessor { -private WindowStore otherWindowStore; -
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on PR #15489: URL: https://github.com/apache/kafka/pull/15489#issuecomment-2028866594 > Sure. I've used waitForCondition to wait for __consumer_offsets to be created when creating consumers to poll I meant `assertEquals(expectedTestTopicOffsets().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets);` should be retried since the metadata is not updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [KAFKA-16435] Add test for KAFKA-16428 [kafka]
mcmmining commented on code in PR #15635: URL: https://github.com/apache/kafka/pull/15635#discussion_r1545770270 ## core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala: ## @@ -112,10 +113,25 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness { assertEquals(newProps.get(key), value) } } +assertPathExistenceAndData("/config/changes/config_change_00", """{"version":2,"entity_path":"brokers/1"}""") migrationState = migrationClient.configClient().deleteConfigs( new ConfigResource(ConfigResource.Type.BROKER, "1"), migrationState) assertEquals(0, zkClient.getEntityConfigs(ConfigType.BROKER, "1").size()) +assertPathExistenceAndData("/config/changes/config_change_01", """{"version":2,"entity_path":"brokers/1"}""") + +// make sure there is no more config change notification in znode + assertFalse(zkClient.pathExists("/config/changes/config_change_02")) + } + + private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = { +assertTrue(zkClient.pathExists(expectedPath)) +assertEquals(Some(data), dataAsString(expectedPath)) Review Comment: > how about `assertEquals(Some(data), zkClient.getDataAndStat(expectedPath)._1.map(new String(_, UTF_8)))`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [KAFKA-16435] Add test for KAFKA-16428 [kafka]
chia7712 commented on code in PR #15635: URL: https://github.com/apache/kafka/pull/15635#discussion_r1545767933 ## core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala: ## @@ -112,10 +113,25 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness { assertEquals(newProps.get(key), value) } } +assertPathExistenceAndData("/config/changes/config_change_00", """{"version":2,"entity_path":"brokers/1"}""") migrationState = migrationClient.configClient().deleteConfigs( new ConfigResource(ConfigResource.Type.BROKER, "1"), migrationState) assertEquals(0, zkClient.getEntityConfigs(ConfigType.BROKER, "1").size()) +assertPathExistenceAndData("/config/changes/config_change_01", """{"version":2,"entity_path":"brokers/1"}""") + +// make sure there is no more config change notification in znode + assertFalse(zkClient.pathExists("/config/changes/config_change_02")) + } + + private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = { +assertTrue(zkClient.pathExists(expectedPath)) +assertEquals(Some(data), dataAsString(expectedPath)) Review Comment: how about `assertEquals(Some(data), zkClient.getDataAndStat(expectedPath)._1.map(new String(_, UTF_8)))`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16225) Flaky test suite LogDirFailureTest
[ https://issues.apache.org/jira/browse/KAFKA-16225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832619#comment-17832619 ] Chia-Ping Tsai commented on KAFKA-16225: It seems to me the root cause is `LogDirFailureHandler` does not clean `directoryIds` in holding `replicaStateChangeLock`, and hence metadata event thread can see the intermediate state of failure handle and then assume the deleted folder is online ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/cluster/Partition.scala#L877]) {code:scala} private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => // [CHIA] logManager.onlineLogDirId(directoryId) return true // there are two results: // 1) KafkaStorageException is thrown by `LogManager.getOrCreateLog` // 2) log is hosted by another directory (id) rather than targetLogDirectoryId if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) } else { warn(s"Skipping creation of log because there are potentially offline log " + s"directories and log may already exist there. directoryId=$directoryId, " + s"topicId=$topicId, targetLogDirectoryId=$targetLogDirectoryId") } case None => createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId) } } {code} Hence, there are two options to stabilize the `testIOExceptionDuringLogRoll` 1. call `logManager.handleLogDirFailure(dir)` in holding `replicaStateChangeLock` ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2473]) to avoid race condition. 2. change the assert to allow both empty folder and the folder which having different directory (id). [https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715] BTW, the option 2 means we allow to using other directory to replace `targetLogDirectoryId` when creating log. That violates the comment: "@param targetLogDirectoryId The directory Id that should host the the partition's topic." ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1015]) > Flaky test suite LogDirFailureTest > -- > > Key: KAFKA-16225 > URL: https://issues.apache.org/jira/browse/KAFKA-16225 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Reporter: Greg Harris >Assignee: Omnia Ibrahim >Priority: Major > Labels: flaky-test > > I see this failure on trunk and in PR builds for multiple methods in this > test suite: > {noformat} > org.opentest4j.AssertionFailedError: expected: but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179) > at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1715) > at > kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:186) > > at > kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:70){noformat} > It appears this assertion is failing > [https://github.com/apache/kafka/blob/f54975c33135140351c50370282e86c49c81bbdd/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715] > The other error which is appearing is this: > {noformat} > org.opentest4j.AssertionFailedError: Unexpected exception type thrown, > expected: but was: > > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > > at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67) > at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) > at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) > at > kafka.server.LogDirFailureTest.testProduceErrorsFromLogDirFailureOnLeader(LogDirFailureTest.scala:164) > > at > kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnLogRoll(LogDirFailureTest.scala:64){noformat} > Failures appear
[PR] KAFKA-16161: Avoid creating remote log metadata snapshot file in partition dir if unused [kafka]
kamalcph opened a new pull request, #15636: URL: https://github.com/apache/kafka/pull/15636 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
vamossagar12 commented on PR #15594: URL: https://github.com/apache/kafka/pull/15594#issuecomment-2028815010 Thanks @kirktrue . I ran a single test by passing a parameter ``` TC_PATHS="tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector" _DUCKTAPE_OPTIONS='--parameters '\''{"exactly_once_source":"False","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":"True","group_protocol":"consumer"}'\' bash tests/docker/run_tests.sh | tee debug_logs.txt ``` and it passes but when I build a yaml file like this ``` my_test_suite: - 'connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector@{"exactly_once_source":"False","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":"True","group_protocol":"consumer"}' ``` and run it like: ``` TC_PATHS="tests/kafkatest/tests/connect/test-suite.yml" bash tests/docker/run_tests.sh | tee debug_logs.txt ``` The yaml file is in the same location as `connect_distributed.py`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
mcmmining commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1545728831 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import java.util.Optional; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamLeftJoin extends KStreamKStreamJoin { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); + +KStreamKStreamLeftJoin(final String otherWindowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joiner, +final boolean outer, +final Optional outerJoinWindowName, +final TimeTrackerSupplier sharedTimeTrackerSupplier) { +super(otherWindowName, windows, windows.beforeMs, windows.afterMs, joiner, outerJoinWindowName, +sharedTimeTrackerSupplier, outer); +} + +@Override +public Processor get() { +return new KStreamKStreamLeftJoinProcessor(); +} + +private class KStreamKStreamLeftJoinProcessor extends KStreamKStreamJoinProcessor { +@Override +public void process(final Record leftRecord) { +final long inputRecordTimestamp = leftRecord.timestamp(); +final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); +final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); +sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); + +if (outer && leftRecord.key() == null && leftRecord.value() != null) { +final VOut leftJoinValue = joiner.apply(leftRecord.key(), leftRecord.value(), null); +context().forward(leftRecord.withValue(leftJoinValue)); +return; +} else if (StreamStreamJoinUtil.skipRecord(leftRecord, LOG, droppedRecordsSensor, context())) { +return; +} + +// Emit all non-joined records which window has closed +if (inputRecordTimestamp == sharedTimeTracker.streamTime) { +leftOuterJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, leftRecord)); +} + +boolean needOuterJoin = outer; +try (final WindowStoreIterator iter = otherWindowStore.fetch(leftRecord.key(), timeFrom, timeTo)) { Review Comment: > I also have a couple of questions. When is this store getting filled? I couldn't find anywhere where we call `.put()` for this window store. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1545727392 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -16,276 +16,98 @@ */ package org.apache.kafka.streams.kstream.internals; +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import java.util.Optional; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.ValueJoinerWithKey; import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; import org.apache.kafka.streams.processor.api.ContextualProcessor; -import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.LeftOrRightValue; import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Optional; - -import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; -import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; - -class KStreamKStreamJoin implements ProcessorSupplier { -private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); +abstract class KStreamKStreamJoin implements ProcessorSupplier { +protected final ValueJoinerWithKey joiner; +protected final long joinGraceMs; +protected final boolean outer; +protected final long joinBeforeMs; +protected final long joinAfterMs; private final String otherWindowName; -private final long joinBeforeMs; -private final long joinAfterMs; -private final long joinGraceMs; private final boolean enableSpuriousResultFix; private final long windowsBeforeMs; private final long windowsAfterMs; - -private final boolean outer; -private final boolean isLeftSide; private final Optional outerJoinWindowName; -private final ValueJoinerWithKey joiner; private final TimeTrackerSupplier sharedTimeTrackerSupplier; -KStreamKStreamJoin(final boolean isLeftSide, - final String otherWindowName, - final JoinWindowsInternal windows, - final ValueJoinerWithKey joiner, - final boolean outer, - final Optional outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { -this.isLeftSide = isLeftSide; +KStreamKStreamJoin(final String otherWindowName, +final JoinWindowsInternal windows, +final long joinBeforeMs, +final long joinAfterMs, +final ValueJoinerWithKey joiner, +final Optional outerJoinWindowName, +final TimeTrackerSupplier sharedTimeTrackerSupplier, +final boolean outer) { this.otherWindowName = otherWindowName; -if (isLeftSide) { -this.joinBeforeMs = windows.beforeMs; -this.joinAfterMs = windows.afterMs; -} else { -this.joinBeforeMs = windows.afterMs; -this.joinAfterMs = windows.beforeMs; -} this.windowsAfterMs = windows.afterMs; this.windowsBeforeMs = windows.beforeMs; -this.joinGraceMs = windows.gracePeriodMs(); +this.joinBeforeMs = joinBeforeMs; +this.joinAfterMs = joinAfterMs; this.enableSpuriousResultFix = windows.spuriousResultFixEnabled(); this.joiner = joiner; -this.outer = outer; +this.joinGraceMs = windows.gracePeriodMs(); this.outerJoinWindowName = outerJoinWindowName; this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; +this.outer = outer; } -@Override -public Processor get() { -return new KStreamKStreamJoinProcessor(); -} - -private class KStreamKStreamJoinProcessor extends ContextualProcessor { -private WindowStore otherWindowStore; -
[PR] [KAFKA-16435] Add test for KAFKA-16428 [kafka]
brandboat opened a new pull request, #15635: URL: https://github.com/apache/kafka/pull/15635 related to KAFKA-16435 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on PR #15601: URL: https://github.com/apache/kafka/pull/15601#issuecomment-2028756399 @gharris1727 I have broken down the KStreamKstreamJoin class into two classes. For now, I just moved the code (+the fix in #15510) to see if all the tests pass and if I am going in the correct direction. I managed to get rid of the unsafe type casts appropriately. I created [two outer join stores](https://github.com/raminqaf/kafka/blob/dc608850c5e7af1a6f589b30f9f7f0921f64942c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L76-L79) (one for the left side and one for the right side). This will remove one of the unsafe casts. The other unsafe cast on the right side of the join [was removed based on the join's left or right value](https://github.com/raminqaf/kafka/blob/dc608850c5e7af1a6f589b30f9f7f0921f64942c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamRightJoin.java#L215-L218). I changed the PR into a draft because it depends on #15510. Please have a look and let me know if this is going in the correct direction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph opened a new pull request, #15634: URL: https://github.com/apache/kafka/pull/15634 Bound high-watermark offset between local-log-start-offset and log-end-offset: The high watermark should not go below the local-log-start offset. If the high watermark is less than the local-log-start-offset, then the UnifiedLog#fetchHighWatermarkMetadata method will throw OFFSET_OUT_OF_RANGE error when it converts the offset to metadata. Once this error happens, the followers will receive out-of-range exceptions and the producers won't be able to produce messages since the leader cannot move the high watermark. This issue can happen when the partition undergoes recovery due to corruption in the checkpoint file and it gets elected as leader before it gets a chance to update the HW from the previous leader. The follower sends the FETCH request to the leader, the leader checks whether the isFollowerInSync, then expands the ISR. Also, parks the request in DelayedFetchPurgatory. If the replica was elected as leader before the fetch-response gets processed, then the new-leader will have wrong high-watermark. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16452) Bound highwatermark offset to range b/w local-log-start-offset and log-end-offset
Kamal Chandraprakash created KAFKA-16452: Summary: Bound highwatermark offset to range b/w local-log-start-offset and log-end-offset Key: KAFKA-16452 URL: https://issues.apache.org/jira/browse/KAFKA-16452 Project: Kafka Issue Type: Task Reporter: Kamal Chandraprakash Assignee: Kamal Chandraprakash The high watermark should not go below the local-log-start offset. If the high watermark is less than the local-log-start-offset, then the [UnifiedLog#fetchHighWatermarkMetadata|https://sourcegraph.com/github.com/apache/kafka@d4caa1c10ec81b9c87eaaf52b73c83d5579b68d3/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L358] method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to metadata. Once this error happens, the followers will receive out-of-range exceptions and the producers won't be able to produce messages since the leader cannot move the high watermark. This issue can happen when the partition undergoes recovery due to corruption in the checkpoint file and it gets elected as leader before it gets a chance to update the HW from the previous leader. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]
gaurav-narula commented on code in PR #14760: URL: https://github.com/apache/kafka/pull/14760#discussion_r1545653758 ## core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala: ## @@ -386,6 +386,7 @@ class NodeToControllerRequestThread( if (response.authenticationException != null) { error(s"Request ${queueItem.request} failed due to authentication error with controller", response.authenticationException) Review Comment: Thanks for the review! Addressed in [c405f73](https://github.com/apache/kafka/pull/14760/commits/c405f7387bcfc94845689d46588c9fc75cbb8596) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] print err log [kafka]
KevinZTW opened a new pull request, #15633: URL: https://github.com/apache/kafka/pull/15633 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes
[ https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832565#comment-17832565 ] Chia-Ping Tsai commented on KAFKA-16414: {quote} Assume that the user configured the retention.bytes = 1 byte and default segment.bytes = 1 GB. And, there is one-record batch in the active segment with total-size = 5 KB. If we rotate the active segment and delete it, then all the data for that partition will be removed. Shouldn't we have to maintain the one byte of data which is the last batch? Handling this case will be tricky, as we have to split the active segment into two, keep only the segment with last-batch, and it will also exceed the user-configured retention bytes. {quote} agree to that `retention.bytes = 1` can cause weird case if we allow to roll the active segment. However, `retention.ms` has similar issue if users set retention.ms with small value. Also, we are able to produce such case by setting `retention.bytes = 0` on current implementation. It seems to me the point is the consistent behavior (roll the active segment) rather than weird use cases (as they are existent already). > Inconsistent active segment expiration behavior between retention.ms and > retention.bytes > > > Key: KAFKA-16414 > URL: https://issues.apache.org/jira/browse/KAFKA-16414 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > > This is a follow up issue on KAFKA-16385. > Currently, there's a difference between how retention.ms and retention.bytes > handle active segment expiration: > - retention.ms always expire active segment when max segment timestamp > matches the condition. > - retention.bytes only expire active segment when retention.bytes is > configured to zero. > The behavior should be either rotate active segments for both retention > configurations or none at all. > For more details, see > https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] admin client minor doc fix for createDelegationToken [kafka]
mcmmining commented on code in PR #15632: URL: https://github.com/apache/kafka/pull/15632#discussion_r1545587374 ## clients/src/main/java/org/apache/kafka/clients/admin/Admin.java: ## @@ -754,7 +754,7 @@ default CreateDelegationTokenResult createDelegationToken() { * * * @param options The options to use when creating delegation token. - * @return The DeleteRecordsResult. Review Comment: clients/src/main/java/org/apache/kafka/admin/Admin.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] admin client minor doc fix for createDelegationToken [kafka]
mcmmining commented on code in PR #15632: URL: https://github.com/apache/kafka/pull/15632#discussion_r1545587374 ## clients/src/main/java/org/apache/kafka/clients/admin/Admin.java: ## @@ -754,7 +754,7 @@ default CreateDelegationTokenResult createDelegationToken() { * * * @param options The options to use when creating delegation token. - * @return The DeleteRecordsResult. Review Comment: clients/src/main/java/org/apache/kafka/admin/Admin.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes
[ https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832561#comment-17832561 ] Kamal Chandraprakash edited comment on KAFKA-16414 at 3/31/24 9:25 AM: --- My suggestion is to keep the existing behavior for {{retention.bytes}} config. Assume that the user configured the {{retention.bytes}} = 1 byte and default {{segment.bytes}} = 1 GB. And, there is one-record batch in the active segment with total-size = 5 KB. If we rotate the active segment and delete it, then all the data for that partition will be removed. Shouldn't we have to maintain the one byte of data which is the last batch? Handling this case will be tricky, as we have to split the active segment into two, keep only the segment with last-batch, and it will also exceed the user-configured retention bytes. For {{{}retention.ms{}}}, the behavior is correct since all the records/batches in that segments are older than the user-configured retention time, so we mark the active segment as eligible for deletion. was (Author: ckamal): My suggestion is to keep the existing behavior for {{retention.bytes}} config. Assume that the user configured the {{retention.bytes}} = 1 byte and default {{segment.bytes}} = 1 GB. And, there is one record batch in the active segment with total-size = 5 KB. If we rotate the active segment and delete it, then all the data for that partition will be removed. Shouldn't we have to maintain the one byte of data which is the last batch? Handling this case will be tricky, as we have to split the active segment into two, keep only the segment with last-batch, and it will also exceed the user-configured retention bytes. For {{{}retention.ms{}}}, the behavior is correct since all the records/batches in that segments are older than the user-configured retention time, so we mark the active segment as eligible for deletion. > Inconsistent active segment expiration behavior between retention.ms and > retention.bytes > > > Key: KAFKA-16414 > URL: https://issues.apache.org/jira/browse/KAFKA-16414 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > > This is a follow up issue on KAFKA-16385. > Currently, there's a difference between how retention.ms and retention.bytes > handle active segment expiration: > - retention.ms always expire active segment when max segment timestamp > matches the condition. > - retention.bytes only expire active segment when retention.bytes is > configured to zero. > The behavior should be either rotate active segments for both retention > configurations or none at all. > For more details, see > https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes
[ https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832561#comment-17832561 ] Kamal Chandraprakash edited comment on KAFKA-16414 at 3/31/24 9:25 AM: --- My suggestion is to keep the existing behavior for {{retention.bytes}} config. Assume that the user configured the {{retention.bytes}} = 1 byte and default {{segment.bytes}} = 1 GB. And, there is one record batch in the active segment with total-size = 5 KB. If we rotate the active segment and delete it, then all the data for that partition will be removed. Shouldn't we have to maintain the one byte of data which is the last batch? Handling this case will be tricky, as we have to split the active segment into two, keep only the segment with last-batch, and it will also exceed the user-configured retention bytes. For {{{}retention.ms{}}}, the behavior is correct since all the records/batches in that segments are older than the user-configured retention time, so we mark the active segment as eligible for deletion. was (Author: ckamal): My suggestion is to keep the existing behavior for {{retention.bytes}} config. Assume that the user configured the {{retention.bytes}} = 1 byte and default {{segment.bytes}} = 1 GB. And, there is one record batch in the active segment with total-size = 5 KB. If we rotate the active segment and delete it, then all the data for that partition will be removed. Shouldn't we have to maintain the one byte of data which is the last batch? Handling this case will be tricky, as we have to split the active segment into two. For {{{}retention.ms{}}}, the behavior is correct since all the records/batches in that segments are older than the user-configured retention time, so we mark the active segment as eligible for deletion. > Inconsistent active segment expiration behavior between retention.ms and > retention.bytes > > > Key: KAFKA-16414 > URL: https://issues.apache.org/jira/browse/KAFKA-16414 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > > This is a follow up issue on KAFKA-16385. > Currently, there's a difference between how retention.ms and retention.bytes > handle active segment expiration: > - retention.ms always expire active segment when max segment timestamp > matches the condition. > - retention.bytes only expire active segment when retention.bytes is > configured to zero. > The behavior should be either rotate active segments for both retention > configurations or none at all. > For more details, see > https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes
[ https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832561#comment-17832561 ] Kamal Chandraprakash commented on KAFKA-16414: -- My suggestion is to keep the existing behavior for {{retention.bytes}} config. Assume that the user configured the {{retention.bytes}} = 1 byte and default {{segment.bytes}} = 1 GB. And, there is one record batch in the active segment with total-size = 5 KB. If we rotate the active segment and delete it, then all the data for that partition will be removed. Shouldn't we have to maintain the one byte of data which is the last batch? Handling this case will be tricky, as we have to split the active segment into two. For {{{}retention.ms{}}}, the behavior is correct since all the records/batches in that segments are older than the user-configured retention time, so we mark the active segment as eligible for deletion. > Inconsistent active segment expiration behavior between retention.ms and > retention.bytes > > > Key: KAFKA-16414 > URL: https://issues.apache.org/jira/browse/KAFKA-16414 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > > This is a follow up issue on KAFKA-16385. > Currently, there's a difference between how retention.ms and retention.bytes > handle active segment expiration: > - retention.ms always expire active segment when max segment timestamp > matches the condition. > - retention.bytes only expire active segment when retention.bytes is > configured to zero. > The behavior should be either rotate active segments for both retention > configurations or none at all. > For more details, see > https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] admin client minor doc fix for createDelegationToken [kafka]
OriHoch opened a new pull request, #15632: URL: https://github.com/apache/kafka/pull/15632 minor doc fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on PR #15489: URL: https://github.com/apache/kafka/pull/15489#issuecomment-2028580025 > @Owen-CH-Leung Could you use `TestUtils.waitForCondition` to verify the records? maybe our QA is too slow to update the metadata before we do the check Sure. I've used `waitForCondition` to wait for `__consumer_offsets` to be created when creating consumers to poll -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org