Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-03-31 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2029213022

   rebase code and apply Luke's patch from 
https://github.com/chia7712/kafka/pull/3/files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15823) NodeToControllerChannelManager: authentication error prevents controller update

2024-03-31 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15823.
---
Resolution: Fixed

> NodeToControllerChannelManager: authentication error prevents controller 
> update
> ---
>
> Key: KAFKA-15823
> URL: https://issues.apache.org/jira/browse/KAFKA-15823
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.0, 3.5.1
>Reporter: Gaurav Narula
>Priority: Major
> Fix For: 3.8.0
>
>
> NodeToControllerChannelManager caches the activeController address in an 
> AtomicReference which is updated when:
>  # activeController [has not been 
> set|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala#L422]
>  # networkClient [disconnnects from the 
> controller|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala#L395C7-L395C7]
>  # A node replies with 
> `[Errors.NOT_CONTROLLER|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala#L408]`,
>  and
>  # When a controller changes from [Zk mode to Kraft 
> mode|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala#L325]
>  
> When running multiple Kafka clusters in a dynamic environment, there is a 
> chance that a controller's IP may get reassigned to another cluster's broker 
> when the controller is bounced. In this scenario, the requests from Node to 
> the Controller may fail with an AuthenticationException and are then retried 
> indefinitely. This causes the node to get stuck as the new controller's 
> information is never set.
>  
> A potential fix would be disconnect the network client and invoke 
> `updateControllerAddress(null)` as we do in the `Errors.NOT_CONTROLLER` case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]

2024-03-31 Thread via GitHub


showuon merged PR #14760:
URL: https://github.com/apache/kafka/pull/14760


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]

2024-03-31 Thread via GitHub


showuon commented on PR #14760:
URL: https://github.com/apache/kafka/pull/14760#issuecomment-2029112501

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: enhance kafka-reassign-partitions command output [kafka]

2024-03-31 Thread via GitHub


showuon commented on PR #15610:
URL: https://github.com/apache/kafka/pull/15610#issuecomment-2029071458

   @AndrewJSchofield , Do you have any other comments? I'm going to merge this 
PR this week if no other comments. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16323) Failing test: fix testRemoteFetchExpiresPerSecMetric

2024-03-31 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16323:
--
Fix Version/s: (was: 3.7.1)

> Failing test: fix testRemoteFetchExpiresPerSecMetric 
> -
>
> Key: KAFKA-16323
> URL: https://issues.apache.org/jira/browse/KAFKA-16323
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Major
>  Labels: test-failure
> Fix For: 3.8.0
>
>
> Refer to 
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2685/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_21_and_Scala_2_13___testRemoteFetchExpiresPerSecMetric__/]
> This test is failing, and this ticket aims to address this 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-31 Thread via GitHub


showuon merged PR #15463:
URL: https://github.com/apache/kafka/pull/15463


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16323) Failing test: fix testRemoteFetchExpiresPerSecMetric

2024-03-31 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16323.
---
Fix Version/s: 3.8.0
   3.7.1
   Resolution: Fixed

> Failing test: fix testRemoteFetchExpiresPerSecMetric 
> -
>
> Key: KAFKA-16323
> URL: https://issues.apache.org/jira/browse/KAFKA-16323
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Major
>  Labels: test-failure
> Fix For: 3.8.0, 3.7.1
>
>
> Refer to 
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2685/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_21_and_Scala_2_13___testRemoteFetchExpiresPerSecMetric__/]
> This test is failing, and this ticket aims to address this 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-31 Thread via GitHub


showuon commented on PR #15463:
URL: https://github.com/apache/kafka/pull/15463#issuecomment-2029068424

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config

2024-03-31 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832665#comment-17832665
 ] 

Kirk True commented on KAFKA-16272:
---

[~sagarrao]—would you mind marking this as either in progress or patch 
available? Thanks!

> Update connect_distributed_test.py to support KIP-848’s group protocol config
> -
>
> Key: KAFKA-16272
> URL: https://issues.apache.org/jira/browse/KAFKA-16272
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Sagar Rao
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in {{connect_distributed_test.py}} 
> to support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16427) KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER

2024-03-31 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16427:
--
Priority: Critical  (was: Minor)

> KafkaConsumer#position() does not respect timeout when group protocol is 
> CONSUMER
> -
>
> Key: KAFKA-16427
> URL: https://issues.apache.org/jira/browse/KAFKA-16427
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Alyssa Huang
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> When
> `long position(TopicPartition partition, final Duration timeout);`
> is called on an unknown topic partition (and auto creation is disabled), the 
> method fails to adhere to the timeout supplied.
> e.g. the following warning is logged continuously as metadata fetches are 
> retried 
> [2024-03-26 11:03:48,589] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Error while fetching metadata with correlation id 200 : 
> \{nonexistingTopic=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient:1313)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-31 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1545788943


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamRightJoin.java:
##
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.Optional;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamRightJoin extends KStreamKStreamJoin {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamRightJoin.class);
+
+
+KStreamKStreamRightJoin(final String otherWindowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joiner,
+final boolean outer,
+final Optional outerJoinWindowName,
+final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+super(otherWindowName, windows, windows.afterMs, windows.beforeMs, 
joiner, outerJoinWindowName,
+sharedTimeTrackerSupplier, outer);
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamRightJoinProcessor();
+}
+
+private class KStreamKStreamRightJoinProcessor extends 
KStreamKStreamJoinProcessor {
+@Override
+public void process(final Record rightRecord) {
+final long inputRecordTimestamp = rightRecord.timestamp();
+final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
+final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+if (outer && rightRecord.key() == null && rightRecord.value() != 
null) {
+
context().forward(rightRecord.withValue(joiner.apply(rightRecord.key(), 
rightRecord.value(), null)));
+return;
+} else if (StreamStreamJoinUtil.skipRecord(rightRecord, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+// Emit all non-joined records which window has closed
+if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
+rightOuterJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, rightRecord));
+}
+
+boolean needOuterJoin = outer;
+// TODO: Where and when are the records put in this window store?
+try (final WindowStoreIterator iter = 
otherWindowStore.fetch(rightRecord.key(), timeFrom, timeTo)) {
+while (iter.hasNext()) {
+needOuterJoin = false;
+final KeyValue leftRecord = iter.next();
+final long leftRecordTimestamp = leftRecord.key;
+
+rightOuterJoinStore.ifPresent(store -> {
+// use putIfAbsent to first read and see if there's 
any values for the key,
+// if yes delete the key, otherwise do not issue a put;
+// we may delete some values with the same key early 
but since we are going
+// range over all values of the same key even after 
failure, since the other window-store
+// is only cleaned up by stream time, so this is okay 
for at-least-once.
+store.putIfAbsent(
+
TimestampedKeyAndJoinSide.makeLeftSide(rightRecord.key(), leftRecordTimestamp), 
null);
+});
+
+final VOut joinResult = joiner.apply(rightRecord.key(), 

Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-31 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1545727392


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -16,276 +16,98 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import java.util.Optional;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
 import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
-import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.LeftOrRightValue;
 import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-
-import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
-import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
-
-class KStreamKStreamJoin implements ProcessorSupplier {
-private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
 
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {
+protected final ValueJoinerWithKey joiner;
+protected final long joinGraceMs;
+protected final boolean outer;
+protected final long joinBeforeMs;
+protected final long joinAfterMs;
 private final String otherWindowName;
-private final long joinBeforeMs;
-private final long joinAfterMs;
-private final long joinGraceMs;
 private final boolean enableSpuriousResultFix;
 private final long windowsBeforeMs;
 private final long windowsAfterMs;
-
-private final boolean outer;
-private final boolean isLeftSide;
 private final Optional outerJoinWindowName;
-private final ValueJoinerWithKey joiner;
 
 private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 
-KStreamKStreamJoin(final boolean isLeftSide,
-   final String otherWindowName,
-   final JoinWindowsInternal windows,
-   final ValueJoinerWithKey joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-this.isLeftSide = isLeftSide;
+KStreamKStreamJoin(final String otherWindowName,
+final JoinWindowsInternal windows,
+final long joinBeforeMs,
+final long joinAfterMs,
+final ValueJoinerWithKey joiner,
+final Optional outerJoinWindowName,
+final TimeTrackerSupplier sharedTimeTrackerSupplier,
+final boolean outer) {
 this.otherWindowName = otherWindowName;
-if (isLeftSide) {
-this.joinBeforeMs = windows.beforeMs;
-this.joinAfterMs = windows.afterMs;
-} else {
-this.joinBeforeMs = windows.afterMs;
-this.joinAfterMs = windows.beforeMs;
-}
 this.windowsAfterMs = windows.afterMs;
 this.windowsBeforeMs = windows.beforeMs;
-this.joinGraceMs = windows.gracePeriodMs();
+this.joinBeforeMs = joinBeforeMs;
+this.joinAfterMs = joinAfterMs;
 this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
 this.joiner = joiner;
-this.outer = outer;
+this.joinGraceMs = windows.gracePeriodMs();
 this.outerJoinWindowName = outerJoinWindowName;
 this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
+this.outer = outer;
 }
 
-@Override
-public Processor get() {
-return new KStreamKStreamJoinProcessor();
-}
-
-private class KStreamKStreamJoinProcessor extends ContextualProcessor {
-private WindowStore otherWindowStore;
-

Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-03-31 Thread via GitHub


chia7712 commented on PR #15489:
URL: https://github.com/apache/kafka/pull/15489#issuecomment-2028866594

   > Sure. I've used waitForCondition to wait for __consumer_offsets to be 
created when creating consumers to poll
   
   I meant `assertEquals(expectedTestTopicOffsets().stream().filter(r -> 
r.partition <= 1).collect(Collectors.toList()), offsets);` should be retried 
since the metadata is not updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [KAFKA-16435] Add test for KAFKA-16428 [kafka]

2024-03-31 Thread via GitHub


mcmmining commented on code in PR #15635:
URL: https://github.com/apache/kafka/pull/15635#discussion_r1545770270


##
core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala:
##
@@ -112,10 +113,25 @@ class ZkConfigMigrationClientTest extends 
ZkMigrationTestHarness {
 assertEquals(newProps.get(key), value)
   }
 }
+assertPathExistenceAndData("/config/changes/config_change_00", 
"""{"version":2,"entity_path":"brokers/1"}""")
 
 migrationState = migrationClient.configClient().deleteConfigs(
   new ConfigResource(ConfigResource.Type.BROKER, "1"), migrationState)
 assertEquals(0, zkClient.getEntityConfigs(ConfigType.BROKER, "1").size())
+assertPathExistenceAndData("/config/changes/config_change_01", 
"""{"version":2,"entity_path":"brokers/1"}""")
+
+// make sure there is no more config change notification in znode
+
assertFalse(zkClient.pathExists("/config/changes/config_change_02"))
+  }
+
+  private def assertPathExistenceAndData(expectedPath: String, data: String): 
Unit = {
+assertTrue(zkClient.pathExists(expectedPath))
+assertEquals(Some(data), dataAsString(expectedPath))

Review Comment:
   > how about `assertEquals(Some(data), 
zkClient.getDataAndStat(expectedPath)._1.map(new String(_, UTF_8)))`?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [KAFKA-16435] Add test for KAFKA-16428 [kafka]

2024-03-31 Thread via GitHub


chia7712 commented on code in PR #15635:
URL: https://github.com/apache/kafka/pull/15635#discussion_r1545767933


##
core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala:
##
@@ -112,10 +113,25 @@ class ZkConfigMigrationClientTest extends 
ZkMigrationTestHarness {
 assertEquals(newProps.get(key), value)
   }
 }
+assertPathExistenceAndData("/config/changes/config_change_00", 
"""{"version":2,"entity_path":"brokers/1"}""")
 
 migrationState = migrationClient.configClient().deleteConfigs(
   new ConfigResource(ConfigResource.Type.BROKER, "1"), migrationState)
 assertEquals(0, zkClient.getEntityConfigs(ConfigType.BROKER, "1").size())
+assertPathExistenceAndData("/config/changes/config_change_01", 
"""{"version":2,"entity_path":"brokers/1"}""")
+
+// make sure there is no more config change notification in znode
+
assertFalse(zkClient.pathExists("/config/changes/config_change_02"))
+  }
+
+  private def assertPathExistenceAndData(expectedPath: String, data: String): 
Unit = {
+assertTrue(zkClient.pathExists(expectedPath))
+assertEquals(Some(data), dataAsString(expectedPath))

Review Comment:
   how about `assertEquals(Some(data), 
zkClient.getDataAndStat(expectedPath)._1.map(new String(_, UTF_8)))`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16225) Flaky test suite LogDirFailureTest

2024-03-31 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832619#comment-17832619
 ] 

Chia-Ping Tsai commented on KAFKA-16225:


It seems to me the root cause is `LogDirFailureHandler` does not clean 
`directoryIds` in holding `replicaStateChangeLock`, and hence metadata event 
thread can see the intermediate state of failure handle and then assume the 
deleted folder is online 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/cluster/Partition.scala#L877])
{code:scala}
  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
targetLogDirectoryId match {
  case Some(directoryId) =>
// [CHIA] logManager.onlineLogDirId(directoryId) return true
// there are two results:
// 1) KafkaStorageException is thrown by `LogManager.getOrCreateLog`
// 2) log is hosted by another directory (id) rather than 
targetLogDirectoryId
  if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
} else {
  warn(s"Skipping creation of log because there are potentially offline 
log " +
s"directories and log may already exist there. 
directoryId=$directoryId, " +
s"topicId=$topicId, targetLogDirectoryId=$targetLogDirectoryId")
}

  case None =>
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId)
}
  }
{code}
Hence, there are two options to stabilize the `testIOExceptionDuringLogRoll`

1. call `logManager.handleLogDirFailure(dir)` in holding 
`replicaStateChangeLock` 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2473])
 to avoid race condition.

2. change the assert to allow both empty folder and the folder which having 
different directory (id). 
[https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715]

BTW, the option 2 means we allow to using other directory to replace 
`targetLogDirectoryId` when creating log. That violates the comment: "@param 
targetLogDirectoryId The directory Id that should host the the partition's 
topic." 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1015])

> Flaky test suite LogDirFailureTest
> --
>
> Key: KAFKA-16225
> URL: https://issues.apache.org/jira/browse/KAFKA-16225
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Greg Harris
>Assignee: Omnia Ibrahim
>Priority: Major
>  Labels: flaky-test
>
> I see this failure on trunk and in PR builds for multiple methods in this 
> test suite:
> {noformat}
> org.opentest4j.AssertionFailedError: expected:  but was:     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>     
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)    
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179)    
> at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1715)    
> at 
> kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:186)
>     
> at 
> kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:70){noformat}
> It appears this assertion is failing
> [https://github.com/apache/kafka/blob/f54975c33135140351c50370282e86c49c81bbdd/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715]
> The other error which is appearing is this:
> {noformat}
> org.opentest4j.AssertionFailedError: Unexpected exception type thrown, 
> expected:  but was: 
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67)    
> at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)    
> at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)    
> at 
> kafka.server.LogDirFailureTest.testProduceErrorsFromLogDirFailureOnLeader(LogDirFailureTest.scala:164)
>     
> at 
> kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnLogRoll(LogDirFailureTest.scala:64){noformat}
> Failures appear 

[PR] KAFKA-16161: Avoid creating remote log metadata snapshot file in partition dir if unused [kafka]

2024-03-31 Thread via GitHub


kamalcph opened a new pull request, #15636:
URL: https://github.com/apache/kafka/pull/15636

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-03-31 Thread via GitHub


vamossagar12 commented on PR #15594:
URL: https://github.com/apache/kafka/pull/15594#issuecomment-2028815010

   Thanks @kirktrue . I ran a single test by passing a parameter 
   ```

TC_PATHS="tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector"
 _DUCKTAPE_OPTIONS='--parameters 
'\''{"exactly_once_source":"False","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":"True","group_protocol":"consumer"}'\'
 bash tests/docker/run_tests.sh | tee debug_logs.txt
   ```
   
   and it passes but when I build a yaml file like this 
   
   ```
   my_test_suite:
 - 
'connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector@{"exactly_once_source":"False","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":"True","group_protocol":"consumer"}'
   ```
   and run it like:
   ```
TC_PATHS="tests/kafkatest/tests/connect/test-suite.yml" bash 
tests/docker/run_tests.sh | tee debug_logs.txt 
   ```
   The yaml file is in the same location as `connect_distributed.py`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-31 Thread via GitHub


mcmmining commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1545728831


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.Optional;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamLeftJoin extends KStreamKStreamJoin {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
+
+KStreamKStreamLeftJoin(final String otherWindowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joiner,
+final boolean outer,
+final Optional outerJoinWindowName,
+final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+super(otherWindowName, windows, windows.beforeMs, windows.afterMs, 
joiner, outerJoinWindowName,
+sharedTimeTrackerSupplier, outer);
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamLeftJoinProcessor();
+}
+
+private class KStreamKStreamLeftJoinProcessor extends 
KStreamKStreamJoinProcessor {
+@Override
+public void process(final Record leftRecord) {
+final long inputRecordTimestamp = leftRecord.timestamp();
+final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
+final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+if (outer && leftRecord.key() == null && leftRecord.value() != 
null) {
+final VOut leftJoinValue = joiner.apply(leftRecord.key(), 
leftRecord.value(), null);
+context().forward(leftRecord.withValue(leftJoinValue));
+return;
+} else if (StreamStreamJoinUtil.skipRecord(leftRecord, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+// Emit all non-joined records which window has closed
+if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
+leftOuterJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, leftRecord));
+}
+
+boolean needOuterJoin = outer;
+try (final WindowStoreIterator iter = 
otherWindowStore.fetch(leftRecord.key(), timeFrom, timeTo)) {

Review Comment:
   > I also have a couple of questions. When is this store getting filled? I 
couldn't find anywhere where we call `.put()` for this window store. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-31 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1545727392


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -16,276 +16,98 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import java.util.Optional;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
 import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
-import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.LeftOrRightValue;
 import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-
-import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
-import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
-
-class KStreamKStreamJoin implements ProcessorSupplier {
-private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
 
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {
+protected final ValueJoinerWithKey joiner;
+protected final long joinGraceMs;
+protected final boolean outer;
+protected final long joinBeforeMs;
+protected final long joinAfterMs;
 private final String otherWindowName;
-private final long joinBeforeMs;
-private final long joinAfterMs;
-private final long joinGraceMs;
 private final boolean enableSpuriousResultFix;
 private final long windowsBeforeMs;
 private final long windowsAfterMs;
-
-private final boolean outer;
-private final boolean isLeftSide;
 private final Optional outerJoinWindowName;
-private final ValueJoinerWithKey joiner;
 
 private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 
-KStreamKStreamJoin(final boolean isLeftSide,
-   final String otherWindowName,
-   final JoinWindowsInternal windows,
-   final ValueJoinerWithKey joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-this.isLeftSide = isLeftSide;
+KStreamKStreamJoin(final String otherWindowName,
+final JoinWindowsInternal windows,
+final long joinBeforeMs,
+final long joinAfterMs,
+final ValueJoinerWithKey joiner,
+final Optional outerJoinWindowName,
+final TimeTrackerSupplier sharedTimeTrackerSupplier,
+final boolean outer) {
 this.otherWindowName = otherWindowName;
-if (isLeftSide) {
-this.joinBeforeMs = windows.beforeMs;
-this.joinAfterMs = windows.afterMs;
-} else {
-this.joinBeforeMs = windows.afterMs;
-this.joinAfterMs = windows.beforeMs;
-}
 this.windowsAfterMs = windows.afterMs;
 this.windowsBeforeMs = windows.beforeMs;
-this.joinGraceMs = windows.gracePeriodMs();
+this.joinBeforeMs = joinBeforeMs;
+this.joinAfterMs = joinAfterMs;
 this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
 this.joiner = joiner;
-this.outer = outer;
+this.joinGraceMs = windows.gracePeriodMs();
 this.outerJoinWindowName = outerJoinWindowName;
 this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
+this.outer = outer;
 }
 
-@Override
-public Processor get() {
-return new KStreamKStreamJoinProcessor();
-}
-
-private class KStreamKStreamJoinProcessor extends ContextualProcessor {
-private WindowStore otherWindowStore;
-

[PR] [KAFKA-16435] Add test for KAFKA-16428 [kafka]

2024-03-31 Thread via GitHub


brandboat opened a new pull request, #15635:
URL: https://github.com/apache/kafka/pull/15635

   related to KAFKA-16435
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-31 Thread via GitHub


raminqaf commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2028756399

   @gharris1727 I have broken down the KStreamKstreamJoin class into two 
classes. For now, I just moved the code (+the fix in #15510) to see if all the 
tests pass and if I am going in the correct direction. I managed to get rid of 
the unsafe type casts appropriately. 
   I created [two outer join 
stores](https://github.com/raminqaf/kafka/blob/dc608850c5e7af1a6f589b30f9f7f0921f64942c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L76-L79)
 (one for the left side and one for the right side). This will remove one of 
the unsafe casts. The other unsafe cast on the right side of the join [was 
removed based on the join's left or right 
value](https://github.com/raminqaf/kafka/blob/dc608850c5e7af1a6f589b30f9f7f0921f64942c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamRightJoin.java#L215-L218).
   
   I changed the PR into a draft because it depends on #15510. Please have a 
look and let me know if this is going in the correct direction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-03-31 Thread via GitHub


kamalcph opened a new pull request, #15634:
URL: https://github.com/apache/kafka/pull/15634

   Bound high-watermark offset between local-log-start-offset and 
log-end-offset:
   
   The high watermark should not go below the local-log-start offset. If the 
high watermark is less than the local-log-start-offset, then the 
UnifiedLog#fetchHighWatermarkMetadata method will throw  OFFSET_OUT_OF_RANGE 
error when it converts the offset to metadata. Once this error happens, the 
followers will receive out-of-range exceptions and the producers won't be able 
to produce messages since the leader cannot move the high watermark.
   
   This issue can happen when the partition undergoes recovery due to 
corruption in the checkpoint file and it gets elected as leader before it gets 
a chance to update the HW from the previous leader.
   
   The follower sends the FETCH request to the leader, the leader checks 
whether the isFollowerInSync, then expands the ISR. Also, parks the request in 
DelayedFetchPurgatory. If the replica was elected as leader before the 
fetch-response gets processed, then the new-leader will have wrong 
high-watermark.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16452) Bound highwatermark offset to range b/w local-log-start-offset and log-end-offset

2024-03-31 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16452:


 Summary: Bound highwatermark offset to range b/w 
local-log-start-offset and log-end-offset
 Key: KAFKA-16452
 URL: https://issues.apache.org/jira/browse/KAFKA-16452
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


The high watermark should not go below the local-log-start offset. If the high 
watermark is less than the local-log-start-offset, then the 
[UnifiedLog#fetchHighWatermarkMetadata|https://sourcegraph.com/github.com/apache/kafka@d4caa1c10ec81b9c87eaaf52b73c83d5579b68d3/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L358]
 method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to 
metadata. Once this error happens, the followers will receive out-of-range 
exceptions and the producers won't be able to produce messages since the leader 
cannot move the high watermark.

This issue can happen when the partition undergoes recovery due to corruption 
in the checkpoint file and it gets elected as leader before it gets a chance to 
update the HW from the previous leader.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]

2024-03-31 Thread via GitHub


gaurav-narula commented on code in PR #14760:
URL: https://github.com/apache/kafka/pull/14760#discussion_r1545653758


##
core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala:
##
@@ -386,6 +386,7 @@ class NodeToControllerRequestThread(
 if (response.authenticationException != null) {
   error(s"Request ${queueItem.request} failed due to authentication error 
with controller",
 response.authenticationException)

Review Comment:
   Thanks for the review! Addressed in 
[c405f73](https://github.com/apache/kafka/pull/14760/commits/c405f7387bcfc94845689d46588c9fc75cbb8596)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] print err log [kafka]

2024-03-31 Thread via GitHub


KevinZTW opened a new pull request, #15633:
URL: https://github.com/apache/kafka/pull/15633

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-03-31 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832565#comment-17832565
 ] 

Chia-Ping Tsai commented on KAFKA-16414:


{quote}
Assume that the user configured the retention.bytes = 1 byte and default 
segment.bytes = 1 GB. And, there is one-record batch in the active segment with 
total-size = 5 KB. If we rotate the active segment and delete it, then all the 
data for that partition will be removed. Shouldn't we have to maintain the one 
byte of data which is the last batch? Handling this case will be tricky, as we 
have to split the active segment into two, keep only the segment with 
last-batch, and it will also exceed the user-configured retention bytes.
{quote}

agree to that `retention.bytes = 1` can cause weird case if we allow to roll 
the active segment. However, `retention.ms` has similar issue if users set 
retention.ms with small value. Also, we are able to produce such case by 
setting `retention.bytes = 0` on current implementation.

It seems to me the point is the consistent behavior (roll the active segment) 
rather than weird use cases (as they are existent already).


> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] admin client minor doc fix for createDelegationToken [kafka]

2024-03-31 Thread via GitHub


mcmmining commented on code in PR #15632:
URL: https://github.com/apache/kafka/pull/15632#discussion_r1545587374


##
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##
@@ -754,7 +754,7 @@ default CreateDelegationTokenResult createDelegationToken() 
{
  * 
  *
  * @param options The options to use when creating delegation token.
- * @return The DeleteRecordsResult.

Review Comment:
   clients/src/main/java/org/apache/kafka/admin/Admin.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] admin client minor doc fix for createDelegationToken [kafka]

2024-03-31 Thread via GitHub


mcmmining commented on code in PR #15632:
URL: https://github.com/apache/kafka/pull/15632#discussion_r1545587374


##
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##
@@ -754,7 +754,7 @@ default CreateDelegationTokenResult createDelegationToken() 
{
  * 
  *
  * @param options The options to use when creating delegation token.
- * @return The DeleteRecordsResult.

Review Comment:
   clients/src/main/java/org/apache/kafka/admin/Admin.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-03-31 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832561#comment-17832561
 ] 

Kamal Chandraprakash edited comment on KAFKA-16414 at 3/31/24 9:25 AM:
---

My suggestion is to keep the existing behavior for {{retention.bytes}} config. 
Assume that the user configured the {{retention.bytes}} = 1 byte and default 
{{segment.bytes}} = 1 GB. And, there is one-record batch in the active segment 
with total-size = 5 KB. If we rotate the active segment and delete it, then all 
the data for that partition will be removed. Shouldn't we have to maintain the 
one byte of data which is the last batch? Handling this case will be tricky, as 
we have to split the active segment into two, keep only the segment with 
last-batch, and it will also exceed the user-configured retention bytes.

For {{{}retention.ms{}}}, the behavior is correct since all the records/batches 
in that segments are older than the user-configured retention time, so we mark 
the active segment as eligible for deletion.


was (Author: ckamal):
My suggestion is to keep the existing behavior for {{retention.bytes}} config. 
Assume that the user configured the {{retention.bytes}} = 1 byte and default 
{{segment.bytes}} = 1 GB. And, there is one record batch in the active segment 
with total-size = 5 KB. If we rotate the active segment and delete it, then all 
the data for that partition will be removed. Shouldn't we have to maintain the 
one byte of data which is the last batch? Handling this case will be tricky, as 
we have to split the active segment into two, keep only the segment with 
last-batch, and it will also exceed the user-configured retention bytes.

For {{{}retention.ms{}}}, the behavior is correct since all the records/batches 
in that segments are older than the user-configured retention time, so we mark 
the active segment as eligible for deletion.

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-03-31 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832561#comment-17832561
 ] 

Kamal Chandraprakash edited comment on KAFKA-16414 at 3/31/24 9:25 AM:
---

My suggestion is to keep the existing behavior for {{retention.bytes}} config. 
Assume that the user configured the {{retention.bytes}} = 1 byte and default 
{{segment.bytes}} = 1 GB. And, there is one record batch in the active segment 
with total-size = 5 KB. If we rotate the active segment and delete it, then all 
the data for that partition will be removed. Shouldn't we have to maintain the 
one byte of data which is the last batch? Handling this case will be tricky, as 
we have to split the active segment into two, keep only the segment with 
last-batch, and it will also exceed the user-configured retention bytes.

For {{{}retention.ms{}}}, the behavior is correct since all the records/batches 
in that segments are older than the user-configured retention time, so we mark 
the active segment as eligible for deletion.


was (Author: ckamal):
My suggestion is to keep the existing behavior for {{retention.bytes}} config. 
Assume that the user configured the {{retention.bytes}} = 1 byte and default 
{{segment.bytes}} = 1 GB. And, there is one record batch in the active segment 
with total-size = 5 KB. If we rotate the active segment and delete it, then all 
the data for that partition will be removed. Shouldn't we have to maintain the 
one byte of data which is the last batch? Handling this case will be tricky, as 
we have to split the active segment into two. 

For {{{}retention.ms{}}}, the behavior is correct since all the records/batches 
in that segments are older than the user-configured retention time, so we mark 
the active segment as eligible for deletion.

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-03-31 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832561#comment-17832561
 ] 

Kamal Chandraprakash commented on KAFKA-16414:
--

My suggestion is to keep the existing behavior for {{retention.bytes}} config. 
Assume that the user configured the {{retention.bytes}} = 1 byte and default 
{{segment.bytes}} = 1 GB. And, there is one record batch in the active segment 
with total-size = 5 KB. If we rotate the active segment and delete it, then all 
the data for that partition will be removed. Shouldn't we have to maintain the 
one byte of data which is the last batch? Handling this case will be tricky, as 
we have to split the active segment into two. 

For {{{}retention.ms{}}}, the behavior is correct since all the records/batches 
in that segments are older than the user-configured retention time, so we mark 
the active segment as eligible for deletion.

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] admin client minor doc fix for createDelegationToken [kafka]

2024-03-31 Thread via GitHub


OriHoch opened a new pull request, #15632:
URL: https://github.com/apache/kafka/pull/15632

   minor doc fix
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-03-31 Thread via GitHub


Owen-CH-Leung commented on PR #15489:
URL: https://github.com/apache/kafka/pull/15489#issuecomment-2028580025

   > @Owen-CH-Leung Could you use `TestUtils.waitForCondition` to verify the 
records? maybe our QA is too slow to update the metadata before we do the check
   
   Sure. I've used `waitForCondition` to wait for `__consumer_offsets` to be 
created when creating consumers to poll


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org