[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-21 Thread via GitHub


urbandan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1173405147


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -968,13 +968,31 @@ private void transitionTo(State target) {
 }
 
 private void transitionTo(State target, RuntimeException error) {
+transitionTo(target, error, false);

Review Comment:
   > > This is a call chain where we transition on direct user action, 
shouldn't this be throwing? KafkaProducer.send -> KafkaProducer.doSend -> 
maybeTransitionToErrorState -> transitionToAbortableError -> transitionTo
   > 
   > Since maybeTransitionToErrorState is being called from inside a catch 
block, if we did throw an exception, it would mask the root issue 
(ApiException), right?
   
   I think masking the ApiException is better than silently transitioning into 
fatal state - if transitionToAbortableError tries going into abortable state, 
that ApiException is probably something that the calling code can handle, and 
try to recover by aborting. If we still throw that exception, but in reality 
the internal state is fatal already, that is a violation of the API, isn't it?
   
   > After the abortableError call, if the state transition was invalid, then 
currentState would be FATAL_ERROR. That has the same effect as the last two 
branches of the if statement that call the fatalError method, right?
   > 
   > Would simply skipping the reenqueue on fatal errors be sufficient?
   
   I think you are right, and the last 2 branches of that if do the same. At a 
later point, Sender.runOnce will handle the fatal state and will stop sending, 
but maybe it would be cleaner to not reenqueue at this point anymore, yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-21 Thread via GitHub


urbandan commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173450470


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @gharris1727 I had the wrong impression about the behavior pre-KAFKA-13659, 
I thought that it was capable of checkpointing older source offsets after 
restart, but I was wrong.
   
   Regardless of that, what I'm trying to say is
   1. Monotonicity is an optimization - it tries to minimize re-processing 
after a failover. (Please correct me if I'm wrong, but I don't really see any 
other user stories behind it.)
   2. Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consitently, they might never get their 
checkpoints translated, ever.
   3. Based on our discussion so far, it seems that monotonicity will not allow 
us to implement "old offset checkpointing after restart", which is not ideal. 
Worst case of breaking monotonicity is sub-optimal failover and extra 
re-processing. Worst case for not translating old offsets after restart is 
never checkpointing a group. Then, with the default consumer config 
(auto.offset.reset=latest), failover results in data loss.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-21 Thread via GitHub


urbandan commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173450470


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @gharris1727 I had the wrong impression about the behavior pre-KAFKA-13659, 
I thought that it was capable of checkpointing older source offsets after 
restart, but I was wrong.
   
   Regardless of that, what I'm trying to say is
   1. Monotonicity is an optimization - it tries to minimize re-processing 
after a failover. (Please correct me if I'm wrong, but I don't really see any 
other user stories behind it.)
   2. Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consitently, they might never get their 
checkpoints translated, ever.
   3. Based on our discussion so far, it seems that monotonicity will not allow 
us to implement "old offset checkpointing after restart", which is not ideal. 
Worst case of breaking monotonicity is sub-optimal failover and extra 
re-processing. Worst case for not translating old offsets after restart is 
never checkpointing a group - then, with the default consumer config 
(auto.offset.reset=latest), failover results in data loss.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13459: KAFKA-14592: Move FeatureCommand to tools

2023-04-21 Thread via GitHub


showuon commented on code in PR #13459:
URL: https://github.com/apache/kafka/pull/13459#discussion_r1173462823


##
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java:
##
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+
+import static 
org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.SAFE_DOWNGRADE;
+import static 
org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
+public class FeatureCommandTest {
+
+private final ClusterInstance cluster;
+public FeatureCommandTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testDescribeWithZK() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), 
"describe"))
+);
+assertEquals("", commandOutput);
+}
+
+@ClusterTest(clusterType = Type.KRAFT, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testDescribeWithKRaft() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), 
"describe"))
+);
+assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.0-IV1\t" +
+"SupportedMaxVersion: 3.5-IV1\tFinalizedVersionLevel: 
3.3-IV1\t", outputWithoutEpoch(commandOutput));
+}
+
+@ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testUpgradeMetadataVersionWithZk() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
+"upgrade", "--metadata", "3.3-IV2"))
+);
+assertEquals("Could not upgrade metadata.version to 6. Could not apply 
finalized feature " +
+"update because the provided feature is not supported.", 
commandOutput);
+}
+
+@ClusterTest(clusterType = Type.KRAFT, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
+public void testUpgradeMetadataVersionWithKraft() {
+String commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
+"upgrade", "--feature", "metadata.version=5"))
+);
+assertEquals("metadata.version was upgraded to 5.", commandOutput);
+
+commandOutput = ToolsTestUtils.captureStandardOut(() ->
+assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
+"upgrade", "--metadata", "3.3-IV2"))
+);
+assertEquals("metadata.version was upgraded to 6.", commandOutput);
+}
+
+@ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataV

[jira] [Commented] (KAFKA-14807) MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the pause of replication of consumer groups

2023-04-21 Thread Zhaoli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714861#comment-17714861
 ] 

Zhaoli commented on KAFKA-14807:


[~durban] Thanks, but we use dedicated mode. 

> MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the 
> pause of replication of consumer groups
> ---
>
> Key: KAFKA-14807
> URL: https://issues.apache.org/jira/browse/KAFKA-14807
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.0, 3.3.1, 3.3.2
> Environment: centos7
>Reporter: Zhaoli
>Priority: Major
>
> We use MirrorMaker2 to replicate messages and consumer group offsets from the 
> Kafka cluster `source` to cluster `target`.
> To reduce the load on the source cluster, we add this configuration to mm2 to 
> avoid replicating the whole history messages:
> {code:java}
> source.consumer.auto.offset.reset=latest {code}
> After that, we found part of the consumer group offsets had stopped 
> replicating.
> The common characteristic of these consumer groups is their EMPTY status, 
> which means they have no active members at that moment. All the active 
> consumer groups‘ offset replication work as normal.
> After researching the source code, we found this is because the configuration 
> above also affects the consumption of topic `mm2-offset-syncs`, therefore the 
> map `offsetSyncs` doesn't hold the whole topic partitions:
> {code:java}
> private final Map offsetSyncs = new HashMap<>(); 
> {code}
> And the lost topicPartitions lead to the pause of replication of the EMPTY 
> consumer groups, which is not expected.
> {code:java}
> OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
> upstreamOffset) {
> Optional offsetSync = latestOffsetSync(sourceTopicPartition);
> if (offsetSync.isPresent()) {
> if (offsetSync.get().upstreamOffset() > upstreamOffset) {
> // Offset is too far in the past to translate accurately
> return OptionalLong.of(-1L);
> }
> long upstreamStep = upstreamOffset - 
> offsetSync.get().upstreamOffset();
> return OptionalLong.of(offsetSync.get().downstreamOffset() + 
> upstreamStep);
> } else {
> return OptionalLong.empty();
> }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #13621: KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito

2023-04-21 Thread via GitHub


clolov commented on PR #13621:
URL: https://github.com/apache/kafka/pull/13621#issuecomment-1517474714

   I believe the related test failures are due to the same problem as 
https://github.com/apache/kafka/pull/13529#discussion_r1168432918. Once that PR 
is merged and this one is rebased those tests should pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] omkreddy commented on pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-21 Thread via GitHub


omkreddy commented on PR #13437:
URL: https://github.com/apache/kafka/pull/13437#issuecomment-1517476396

   Test failures are known connect/mirror failures: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13437/12/tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] omkreddy merged pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-21 Thread via GitHub


omkreddy merged PR #13437:
URL: https://github.com/apache/kafka/pull/13437


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

2023-04-21 Thread via GitHub


showuon commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1173486311


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -21,97 +21,120 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
+import static java.util.Collections.singleton;
+
 /**
- * A simple consumer thread that demonstrate subscribe and poll use case. The 
thread subscribes to a topic,
- * then runs a loop to poll new messages, and print the message out. The 
thread closes until the target {@code
- * numMessageToConsume} is hit or catching an exception.
+ * A simple consumer thread that subscribes to a topic, fetches new records 
and prints them.
+ * The thread does not stop until all records are completed or an exception is 
raised.
  */
 public class Consumer extends Thread implements ConsumerRebalanceListener {
-private final KafkaConsumer consumer;
+private final String bootstrapServers;
 private final String topic;
 private final String groupId;
-private final int numMessageToConsume;
-private int messageRemaining;
+private final Optional instanceId;
+private final boolean readCommitted;
+private final int numRecords;
 private final CountDownLatch latch;
+private volatile boolean closed;
+private int remainingRecords;
 
-public Consumer(final String topic,
-final String groupId,
-final Optional instanceId,
-final boolean readCommitted,
-final int numMessageToConsume,
-final CountDownLatch latch) {
-super("KafkaConsumerExample");
-this.groupId = groupId;
-Properties props = new Properties();
-props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-instanceId.ifPresent(id -> 
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.IntegerDeserializer");
-props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
-if (readCommitted) {
-props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-}
-props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-consumer = new KafkaConsumer<>(props);
+public Consumer(String threadName,
+String bootstrapServers,
+String topic,
+String groupId,
+Optional instanceId,
+boolean readCommitted,
+int numRecords,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
-this.numMessageToConsume = numMessageToConsume;
-this.messageRemaining = numMessageToConsume;
+this.groupId = groupId;
+this.instanceId = instanceId;
+this.readCommitted = readCommitted;
+this.numRecords = numRecords;
+this.remainingRecords = numRecords;
 this.latch = latch;
 }
 
-KafkaConsumer get() {
-return consumer;
-}
-
 @Override
 public void run() {
-try {
-System.out.println("Subscribe to:" + this.topic);
-consumer.subscribe(Collections.singletonList(this.topic), this);
-do {
-doWork();
-} while (messageRemaining > 0);
-System.out.println(groupId + " finished reading " + 
numMessageToConsume + " messages");
-} catch (WakeupException e) {
-// swallow the wakeup
-} catch (Exception e) {
-System.out.println("Unexpected termination, exception thrown:" + 
e);
-} finally {
-shutdown();
+// the consumer instance is NOT thread safe
+try (KafkaConsumer consumer = createK

[GitHub] [kafka] dajac merged pull request #13555: MINOR: Move `ControllerPurgatory` to `server-common`

2023-04-21 Thread via GitHub


dajac merged PR #13555:
URL: https://github.com/apache/kafka/pull/13555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13537: KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata

2023-04-21 Thread via GitHub


dajac merged PR #13537:
URL: https://github.com/apache/kafka/pull/13537


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14925) The website shouldn't load external resources

2023-04-21 Thread Atul Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714920#comment-17714920
 ] 

Atul Sharma commented on KAFKA-14925:
-

Want to work on this, can someone assign to me?

> The website shouldn't load external resources
> -
>
> Key: KAFKA-14925
> URL: https://issues.apache.org/jira/browse/KAFKA-14925
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Mickael Maison
>Priority: Major
>
> In includes/_header.htm, we load a resource from fontawesome.com



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14925) The website shouldn't load external resources

2023-04-21 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-14925:
--

Assignee: Atul Sharma

> The website shouldn't load external resources
> -
>
> Key: KAFKA-14925
> URL: https://issues.apache.org/jira/browse/KAFKA-14925
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Mickael Maison
>Assignee: Atul Sharma
>Priority: Major
>
> In includes/_header.htm, we load a resource from fontawesome.com



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14925) The website shouldn't load external resources

2023-04-21 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714923#comment-17714923
 ] 

Mickael Maison commented on KAFKA-14925:


Done

> The website shouldn't load external resources
> -
>
> Key: KAFKA-14925
> URL: https://issues.apache.org/jira/browse/KAFKA-14925
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Mickael Maison
>Assignee: Atul Sharma
>Priority: Major
>
> In includes/_header.htm, we load a resource from fontawesome.com



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-21 Thread via GitHub


Hangleton commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1173645096


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/AclCache.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.apache.kafka.server.immutable.ImmutableNavigableSet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An immutable class that stores the ACLs in KRaft-based clusters.
+ */
+public class AclCache {
+/**
+ * Contains all of the current ACLs sorted by (resource type, resource 
name).
+ */
+private final ImmutableNavigableSet aclsByResource;
+
+/**
+ * Contains all of the current ACLs indexed by UUID.
+ */
+private final ImmutableMap aclsById;
+
+AclCache() {
+this(ImmutableNavigableSet.empty(), ImmutableMap.empty());
+}
+
+private AclCache(final ImmutableNavigableSet aclsByResource, 
final ImmutableMap aclsById) {
+this.aclsByResource = aclsByResource;
+this.aclsById = aclsById;
+}
+
+public ImmutableNavigableSet aclsByResource() {
+return aclsByResource;
+}
+
+Iterable acls(AclBindingFilter filter) {
+List aclBindingList = new ArrayList<>();
+aclsByResource.forEach(acl -> {
+AclBinding aclBinding = acl.toBinding();
+if (filter.matches(aclBinding)) {
+aclBindingList.add(aclBinding);
+}
+});
+return aclBindingList;
+}
+
+int count() {
+return aclsById.size();
+}
+
+StandardAcl getAcl(Uuid id) {
+return aclsById.get(id);
+}
+
+AclCache addAcl(Uuid id, StandardAcl acl) {

Review Comment:
   > *Since writes are done on a single thread, the only case of concurrency we 
have to solve here is when multiple reads and a single write are happening in 
parallel.*
   
   Do I get this right that the single writer assumption stated in the PR 
description is critical to achieve consistency in the sequence of operations 
below? (e.g. that the state checked line 77 is still valid line 81). Should 
multiple writes happen concurrently, this would not be the case, right? Is 
there a way to enforce it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13619: Initial support for OpenJDK CRaC snapshotting

2023-04-21 Thread via GitHub


Hangleton commented on PR #13619:
URL: https://github.com/apache/kafka/pull/13619#issuecomment-1517679958

   Hi, Radim,
   
   Thank you for the follow-up and clarifying, I missed the fact that the 
targeted components are the Kafka clients. 
   
   Some of the previous statements regarding state may still be valid. 
Typically, a Kafka client holds cluster and topic metadata and one of its first 
operation on start-up, once a connection with a bootstrap broker is 
established, is to fetch these metadata to get an up-to-date view of the 
cluster (e.g. broker membership).
   
   But, I lack the background to fully understand this approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-21 Thread via GitHub


Hangleton commented on code in PR #13437:
URL: https://github.com/apache/kafka/pull/13437#discussion_r1173645096


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/AclCache.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.apache.kafka.server.immutable.ImmutableNavigableSet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An immutable class that stores the ACLs in KRaft-based clusters.
+ */
+public class AclCache {
+/**
+ * Contains all of the current ACLs sorted by (resource type, resource 
name).
+ */
+private final ImmutableNavigableSet aclsByResource;
+
+/**
+ * Contains all of the current ACLs indexed by UUID.
+ */
+private final ImmutableMap aclsById;
+
+AclCache() {
+this(ImmutableNavigableSet.empty(), ImmutableMap.empty());
+}
+
+private AclCache(final ImmutableNavigableSet aclsByResource, 
final ImmutableMap aclsById) {
+this.aclsByResource = aclsByResource;
+this.aclsById = aclsById;
+}
+
+public ImmutableNavigableSet aclsByResource() {
+return aclsByResource;
+}
+
+Iterable acls(AclBindingFilter filter) {
+List aclBindingList = new ArrayList<>();
+aclsByResource.forEach(acl -> {
+AclBinding aclBinding = acl.toBinding();
+if (filter.matches(aclBinding)) {
+aclBindingList.add(aclBinding);
+}
+});
+return aclBindingList;
+}
+
+int count() {
+return aclsById.size();
+}
+
+StandardAcl getAcl(Uuid id) {
+return aclsById.get(id);
+}
+
+AclCache addAcl(Uuid id, StandardAcl acl) {

Review Comment:
   > *Since writes are done on a single thread, the only case of concurrency we 
have to solve here is when multiple reads and a single write are happening in 
parallel.*
   
   Do I get this right that the single writer assumption stated in the PR 
description is critical to achieve consistency in the sequence of operations 
below? (e.g. that the state checked line 77 is still valid line 81). Should 
multiple writes happen concurrently, this would not be the case, right? Is 
there a way to enforce the single writer condition? Or, shouldn't the cache 
preserve consistency under multiple writers (since it has no control over how 
many actors can update its state concurrently)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13538: KAFKA-14462; [8/N] Add ConsumerGroupMember

2023-04-21 Thread via GitHub


dajac commented on code in PR #13538:
URL: https://github.com/apache/kafka/pull/13538#discussion_r1173669055


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * ConsumerGroupMember contains all the information related to a member
+ * within a consumer group. This class is immutable and is fully backed
+ * by records stored in the __consumer_offsets topic.
+ */
+public class ConsumerGroupMember {
+/**
+ * A builder allowing to create a new member or update an
+ * existing one.
+ *
+ * Please refer to the javadoc of {{@link ConsumerGroupMember}} for the
+ * definition of the fields.
+ */
+public static class Builder {
+private final String memberId;
+private int memberEpoch = 0;
+private int previousMemberEpoch = -1;
+private int nextMemberEpoch = 0;
+private String instanceId = null;
+private String rackId = null;
+private int rebalanceTimeoutMs = -1;
+private String clientId = "";
+private String clientHost = "";
+private List subscribedTopicNames = Collections.emptyList();
+private String subscribedTopicRegex = "";
+private String serverAssignorName = null;
+private List clientAssignors = Collections.emptyList();
+private Map> assignedPartitions = 
Collections.emptyMap();
+private Map> partitionsPendingRevocation = 
Collections.emptyMap();
+private Map> partitionsPendingAssignment = 
Collections.emptyMap();
+
+public Builder(String memberId) {
+this.memberId = Objects.requireNonNull(memberId);
+}
+
+public Builder(ConsumerGroupMember member) {
+Objects.requireNonNull(member);
+
+this.memberId = member.memberId;
+this.memberEpoch = member.memberEpoch;
+this.previousMemberEpoch = member.previousMemberEpoch;
+this.nextMemberEpoch = member.nextMemberEpoch;
+this.instanceId = member.instanceId;
+this.rackId = member.rackId;
+this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+this.clientId = member.clientId;
+this.clientHost = member.clientHost;
+this.subscribedTopicNames = member.subscribedTopicNames;
+this.subscribedTopicRegex = member.subscribedTopicRegex;
+this.serverAssignorName = member.serverAssignorName;
+this.clientAssignors = member.clientAssignors;
+this.assignedPartitions = member.assignedPartitions;
+this.partitionsPendingRevocation = 
member.partitionsPendingRevocation;
+this.partitionsPendingAssignment = 
member.partitionsPendingAssignment;
+}
+
+public Builder setMemberEpoch(int memberEpoch) {
+this.memberEpoch = memberEpoch;
+return this;
+}
+
+public Builder setPreviousMemberEpoch(int previousMemberEpoch) {
+this.previousMemberEpoch = previousMemberEpoch;
+return this;
+}
+
+public Builder setNextMemberEpoch(int nextMemberEpoch) {
+this.nextMemberEpoch = nextMemberEpoch;
+return this;
+}
+
+public Builder setInstanceId(String instanceId) {
+this.instanceId = instanceId;
+return this;
+}
+
+public Builder maybeUpdateInstanceId(Optional instanceId) {
+this.instanceId = instanceId.orElse(this.instanceId);
+return this;
+}
+
+public Buil

[GitHub] [kafka] vamossagar12 commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-21 Thread via GitHub


vamossagar12 commented on PR #13594:
URL: https://github.com/apache/kafka/pull/13594#issuecomment-1517696428

   Thanks @yashmayya . I just tried to include only the ones which have a 
similar pattern. I think `SourceTaskOffsetCommitter` is something which can 
also be modified but I missed that. The one in `MemoryOffsetBackingStore` has 
some handling in cases on InterruptedException and infact throws 
ConnectException when it can't shutdown in time which is unlike what the newly 
added utils method does. 
   Specifically on `Worker` I decided not to change because it was added as 
part of KAFKA-12380. As I said, I changed only those which were similar to the 
pattern the newly added Utils method had. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13596: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, P…

2023-04-21 Thread via GitHub


dajac merged PR #13596:
URL: https://github.com/apache/kafka/pull/13596


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13597: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, P…

2023-04-21 Thread via GitHub


dajac merged PR #13597:
URL: https://github.com/apache/kafka/pull/13597


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13600: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-21 Thread via GitHub


dajac commented on PR #13600:
URL: https://github.com/apache/kafka/pull/13600#issuecomment-1517708617

   @jeffkbkim Could you please rebase this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13601: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-21 Thread via GitHub


dajac commented on PR #13601:
URL: https://github.com/apache/kafka/pull/13601#issuecomment-1517708744

   @jeffkbkim Could you please rebase this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13604: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-21 Thread via GitHub


dajac merged PR #13604:
URL: https://github.com/apache/kafka/pull/13604


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-12826) Remove Deprecated Class Serdes (Streams)

2023-04-21 Thread shiqin.lan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shiqin.lan reassigned KAFKA-12826:
--

Assignee: (was: shiqin.lan)

> Remove Deprecated Class Serdes (Streams)
> 
>
> Key: KAFKA-12826
> URL: https://issues.apache.org/jira/browse/KAFKA-12826
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Class org.apache.kafka.streams.scala.Serdes was deprecated in version 2.7
> See KAFKA-10020 and KIP-616



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12826) Remove Deprecated Class Serdes (Streams)

2023-04-21 Thread shiqin.lan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shiqin.lan reassigned KAFKA-12826:
--

Assignee: shiqin.lan

> Remove Deprecated Class Serdes (Streams)
> 
>
> Key: KAFKA-12826
> URL: https://issues.apache.org/jira/browse/KAFKA-12826
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Assignee: shiqin.lan
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Class org.apache.kafka.streams.scala.Serdes was deprecated in version 2.7
> See KAFKA-10020 and KIP-616



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14926) Remove metrics on Log Cleaner shutdown

2023-04-21 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-14926:


 Summary: Remove metrics on Log Cleaner shutdown
 Key: KAFKA-14926
 URL: https://issues.apache.org/jira/browse/KAFKA-14926
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Divij Vaidya
Assignee: Divij Vaidya
 Fix For: 3.6.0


We register metrics with the KafkaMetricsGroup in LogCleaner.scala but we don't 
remove them on shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13475: KAFKA-14652: Add the flow to the log context and the Connect-managed …

2023-04-21 Thread via GitHub


viktorsomogyi commented on code in PR #13475:
URL: https://github.com/apache/kafka/pull/13475#discussion_r1173739241


##
config/connect-mirror-maker.properties:
##
@@ -57,3 +57,6 @@ config.storage.replication.factor=1
 # replication.policy.separator = _
 # sync.topic.acls.enabled = false
 # emit.heartbeats.interval.seconds = 5
+
+# enable flow in the logs for improved diagnostics
+add.flow.context = true

Review Comment:
   Nit: this should be false according to your KIP but I guess you haven't 
updated this PR since, so just making a note.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya opened a new pull request, #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-04-21 Thread via GitHub


divijvaidya opened a new pull request, #13623:
URL: https://github.com/apache/kafka/pull/13623

   # Motivation
   When Log cleaning is shutdown, it doesn't remove metrics that were 
registered to `KafkaYammerMetrics.defaultRegistry()` which has one instance per 
server. Log cleaner's lifecycle is associated with lifecycle of `LogManager` 
and hence, there is no possibility where log cleaner will be shutdown but the 
broker won't. Broker shutdown will close the `jmxReporter` and hence, there is 
no current metric leak here.
   The motivation for this code change is to "do the right thing" from a code 
hygiene perspective.
   
   # Test
   Added a unit test that fails before the change and passes after the change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat merged pull request #13612: MINOR: fix noticed typo in raft and metadata projects

2023-04-21 Thread via GitHub


jlprat merged PR #13612:
URL: https://github.com/apache/kafka/pull/13612


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14926) Remove metrics on Log Cleaner shutdown

2023-04-21 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14926:
-
Issue Type: Improvement  (was: Bug)

> Remove metrics on Log Cleaner shutdown
> --
>
> Key: KAFKA-14926
> URL: https://issues.apache.org/jira/browse/KAFKA-14926
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Minor
> Fix For: 3.6.0
>
>
> We register metrics with the KafkaMetricsGroup in LogCleaner.scala but we 
> don't remove them on shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] urbandan commented on a diff in pull request #13475: KAFKA-14652: Add the flow to the log context and the Connect-managed …

2023-04-21 Thread via GitHub


urbandan commented on code in PR #13475:
URL: https://github.com/apache/kafka/pull/13475#discussion_r1173744877


##
config/connect-mirror-maker.properties:
##
@@ -57,3 +57,6 @@ config.storage.replication.factor=1
 # replication.policy.separator = _
 # sync.topic.acls.enabled = false
 # emit.heartbeats.interval.seconds = 5
+
+# enable flow in the logs for improved diagnostics
+add.flow.context = true

Review Comment:
   The default of the config is false, but I think that is only necessary for 
being backward compatible. I honestly think that the recommendation (the sample 
config) should be to enable the extra log information by default, as many 
use-cases will benefit from it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10883) Lien hypertext incorrect

2023-04-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714998#comment-17714998
 ] 

ASF GitHub Bot commented on KAFKA-10883:


machi1990 opened a new pull request, #504:
URL: https://github.com/apache/kafka-site/pull/504

   Closes https://issues.apache.org/jira/browse/KAFKA-10883
   
   Hi @mimaison can you've a look at this? 




> Lien hypertext incorrect
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> En tant que visiteur du site web de présentation kafka,
>  [https://kafka.apache.org/] je n'ai pas accès à la page "get started"
>  ni à la page community depuis le menu principal (top right).
> L'item du menu "get-started" devrait pointer sur /get-started



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] machi1990 opened a new pull request, #13624: MINOR: remove unused ProcessorNode#time field

2023-04-21 Thread via GitHub


machi1990 opened a new pull request, #13624:
URL: https://github.com/apache/kafka/pull/13624

   The field is not used so can be removed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13612: MINOR: fix noticed typo in raft and metadata projects

2023-04-21 Thread via GitHub


machi1990 commented on PR #13612:
URL: https://github.com/apache/kafka/pull/13612#issuecomment-1517827939

   Thanks for the review @jlprat 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10883) Lien hypertext incorrect

2023-04-21 Thread Manyanda Chitimbo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715005#comment-17715005
 ] 

Manyanda Chitimbo commented on KAFKA-10883:
---

I've opened a PR [https://github.com/apache/kafka-site/pull/504] to fix the 
issue 

> Lien hypertext incorrect
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> En tant que visiteur du site web de présentation kafka,
>  [https://kafka.apache.org/] je n'ai pas accès à la page "get started"
>  ni à la page community depuis le menu principal (top right).
> L'item du menu "get-started" devrait pointer sur /get-started



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10883) Lien hypertext incorrect

2023-04-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715008#comment-17715008
 ] 

ASF GitHub Bot commented on KAFKA-10883:


mimaison commented on PR #504:
URL: https://github.com/apache/kafka-site/pull/504#issuecomment-1517838156

   Hi @machi1990, thanks for the PR!
   
   I agree that the behavior of the menu is a bit strange. The issue is that 
the "get started" page does not seem finished, so I wonder if it's the reason 
it's currently not reachable. 
   
   For example, it says "All you need to know to get up and running with Apache 
Kafka can be found here. **Etc. Etc.**". Also the buttons don't seem to work.
   
   So before enabling it, I think we should properly finish it. WDYT?




> Lien hypertext incorrect
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> En tant que visiteur du site web de présentation kafka,
>  [https://kafka.apache.org/] je n'ai pas accès à la page "get started"
>  ni à la page community depuis le menu principal (top right).
> L'item du menu "get-started" devrait pointer sur /get-started



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] machi1990 commented on pull request #13624: MINOR: remove unused ProcessorNode#time field and remove unused imports

2023-04-21 Thread via GitHub


machi1990 commented on PR #13624:
URL: https://github.com/apache/kafka/pull/13624#issuecomment-1517844646

   Hi @guozhangwang @ableegoldman would you mind giving this a look when you've 
some time? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on a diff in pull request #13575: KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin timeouts

2023-04-21 Thread via GitHub


jlprat commented on code in PR #13575:
URL: https://github.com/apache/kafka/pull/13575#discussion_r1173773831


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java:
##
@@ -38,16 +38,13 @@
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /** Customised ForwardingAdmin for testing only.
  * The class create/alter topics, partitions and ACLs in Kafka then store 
metadata in {@link FakeLocalMetadataStore}.
  * */
 
 public class FakeForwardingAdminWithLocalMetadata extends ForwardingAdmin {
 private static final Logger log = 
LoggerFactory.getLogger(FakeForwardingAdminWithLocalMetadata.class);
-private final long timeout = 1000L;

Review Comment:
   Any reason why removing the timeout completely instead of increasing it to 
some abundantly high number of seconds?
   My time at Akka tells me: never wait forever; always set a timeout, even if 
it's high.
   What do you think about putting something like 15 seconds? This way if 
something goes wrong and the future never finished we won't be hanging there 
forever.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14722) Make BooleanSerde public

2023-04-21 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715010#comment-17715010
 ] 

Mickael Maison commented on KAFKA-14722:


Have the docs updates been merged? Or is anyone working on it?

> Make BooleanSerde public
> 
>
> Key: KAFKA-14722
> URL: https://issues.apache.org/jira/browse/KAFKA-14722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Spacrocket
>Priority: Minor
>  Labels: beginner, kip, newbie
> Fix For: 3.5.0
>
>
> KIP-907: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface]
>  
> We introduce a "BooleanSerde" via 
> [https://github.com/apache/kafka/pull/13249] as internal class. We could make 
> it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14927) Dynamic configs not validated when using kafka-configs and --add-config-file

2023-04-21 Thread Justin Daines (Jira)
Justin Daines created KAFKA-14927:
-

 Summary: Dynamic configs not validated when using kafka-configs 
and --add-config-file
 Key: KAFKA-14927
 URL: https://issues.apache.org/jira/browse/KAFKA-14927
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 3.3.2
Reporter: Justin Daines


Using {{kafka-configs}} should validate dynamic configurations before applying. 
It is possible to send a file with invalid configurations. 

For example a file containing the following:

 
{code:java}
{
  "routes": {
    "crn:///kafka=*": {
      "management": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events-denied"
      },
      "describe": {
        "allowed": "",
        "denied": "confluent-audit-log-events-denied"
      },
      "authentication": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events-denied-authn"
      },
      "authorize": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events-denied-authz"
      },
      "interbroker": {
        "allowed": "",
        "denied": ""
      }
    },
    "crn:///kafka=*/group=*": {
      "consume": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events"
      }
    },
    "crn:///kafka=*/topic=*": {
      "produce": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events"
      },
      "consume": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events"
      }
    }
  },
  "destinations": {
    "topics": {
      "confluent-audit-log-events": {
        "retention_ms": 777600
      },
      "confluent-audit-log-events-denied": {
        "retention_ms": 777600
      },
      "confluent-audit-log-events-denied-authn": {
        "retention_ms": 777600
      },
      "confluent-audit-log-events-denied-authz": {
        "retention_ms": 777600
      },
      "confluent-audit-log-events_audit": {
        "retention_ms": 777600
      }
    }
  },
  "default_topics": {
    "allowed": "confluent-audit-log-events_audit",
    "denied": "confluent-audit-log-events"
  },
  "excluded_principals": [
    "User:schemaregistryUser",
    "User:ANONYMOUS",
    "User:appSA",
    "User:admin",
    "User:connectAdmin",
    "User:connectorSubmitter",
    "User:connectorSA",
    "User:schemaregistryUser",
    "User:ksqlDBAdmin",
    "User:ksqlDBUser",
    "User:controlCenterAndKsqlDBServer",
    "User:controlcenterAdmin",
    "User:restAdmin",
    "User:appSA",
    "User:clientListen",
    "User:superUser"
  ]
} {code}
 

 
{code:java}
kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers 
--entity-default --alter --add-config-file audit-log.json {code}
 

Yields the following dynamic configs:

 
{code:java}
Default configs for brokers in the cluster are:
  "destinations"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null}
  "confluent-audit-log-events-denied-authn"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null}
  "routes"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null}
  "User=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null}
  },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null}
  "excluded_principals"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null}
  "confluent-audit-log-events_audit"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null}
  "authorize"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null}
  "default_topics"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null}
  "topics"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null}
  ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null}
  "interbroker"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null}
  "produce"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"produce"=null}
  "denied"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"denied"=null}
  "confluent-audit-log-events-denied"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied"=null}
  "confluent-audit-log-events"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events"=null}
  "crn=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"crn=null}
  "management"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"management"=null}
  "describe"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"describe"=null}
  "allowed"

[jira] [Updated] (KAFKA-14927) Dynamic configs not validated when using kafka-configs and --add-config-file

2023-04-21 Thread Justin Daines (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justin Daines updated KAFKA-14927:
--
Description: 
Using {{kafka-configs}} should validate dynamic configurations before applying. 
It is possible to send a file with invalid configurations. 

For example a file containing the following:
{code:java}
{
  "routes": {
    "crn:///kafka=*": {
      "management": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events-denied"
      },
      "describe": {
        "allowed": "",
        "denied": "confluent-audit-log-events-denied"
      },
      "authentication": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events-denied-authn"
      },
      "authorize": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events-denied-authz"
      },
      "interbroker": {
        "allowed": "",
        "denied": ""
      }
    },
    "crn:///kafka=*/group=*": {
      "consume": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events"
      }
    },
    "crn:///kafka=*/topic=*": {
      "produce": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events"
      },
      "consume": {
        "allowed": "confluent-audit-log-events_audit",
        "denied": "confluent-audit-log-events"
      }
    }
  },
  "destinations": {
    "topics": {
      "confluent-audit-log-events": {
        "retention_ms": 777600
      },
      "confluent-audit-log-events-denied": {
        "retention_ms": 777600
      },
      "confluent-audit-log-events-denied-authn": {
        "retention_ms": 777600
      },
      "confluent-audit-log-events-denied-authz": {
        "retention_ms": 777600
      },
      "confluent-audit-log-events_audit": {
        "retention_ms": 777600
      }
    }
  },
  "default_topics": {
    "allowed": "confluent-audit-log-events_audit",
    "denied": "confluent-audit-log-events"
  },
  "excluded_principals": [
    "User:schemaregistryUser",
    "User:ANONYMOUS",
    "User:appSA",
    "User:admin",
    "User:connectAdmin",
    "User:connectorSubmitter",
    "User:connectorSA",
    "User:schemaregistryUser",
    "User:ksqlDBAdmin",
    "User:ksqlDBUser",
    "User:controlCenterAndKsqlDBServer",
    "User:controlcenterAdmin",
    "User:restAdmin",
    "User:appSA",
    "User:clientListen",
    "User:superUser"
  ]
} {code}
{code:java}
kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers 
--entity-default --alter --add-config-file audit-log.json {code}
Yields the following dynamic configs:
{code:java}
Default configs for brokers in the cluster are:
  "destinations"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null}
  "confluent-audit-log-events-denied-authn"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null}
  "routes"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null}
  "User=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null}
  },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null}
  "excluded_principals"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null}
  "confluent-audit-log-events_audit"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null}
  "authorize"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null}
  "default_topics"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null}
  "topics"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null}
  ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null}
  "interbroker"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null}
  "produce"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"produce"=null}
  "denied"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"denied"=null}
  "confluent-audit-log-events-denied"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied"=null}
  "confluent-audit-log-events"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events"=null}
  "crn=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"crn=null}
  "management"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"management"=null}
  "describe"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"describe"=null}
  "allowed"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"allowed"=null}
  "consume"=null sensitive=true 
synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"consume"=null}
  "confluent-audit-log-events-denied-authz"=null sensitiv

[jira] [Created] (KAFKA-14928) Metrics collection contends on lock with log cleaning

2023-04-21 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-14928:


 Summary: Metrics collection contends on lock with log cleaning
 Key: KAFKA-14928
 URL: https://issues.apache.org/jira/browse/KAFKA-14928
 Project: Kafka
  Issue Type: Bug
Reporter: Divij Vaidya
Assignee: Divij Vaidya
 Fix For: 3.6.0


In LogCleanerManager.scala, calculation of a metric requires a lock [1]. This 
same lock is required by core log cleaner functionality such as 
"grabFilthiestCompactedLog". This might lead to a situation where metric 
calculation holding the lock for an extended period of time may affect the core 
functionality of log cleaning.

This outcome of this task is to prevent expensive metric calculation from 
blocking log cleaning/compaction activity.

[1] 
https://github.com/apache/kafka/blob/dd63d88ac3ea7a9a55a6dacf9c5473e939322a55/core/src/main/scala/kafka/log/LogCleanerManager.scala#L102



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10883) Lien hypertext incorrect

2023-04-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715035#comment-17715035
 ] 

ASF GitHub Bot commented on KAFKA-10883:


machi1990 commented on PR #504:
URL: https://github.com/apache/kafka-site/pull/504#issuecomment-1517920328

   Hi @mimaison thanks for the review. You are right. I landed on this issue 
when I was trying to understand how the website is generated. I didn't 
appreciate that the other buttons don't work: when I navigated to the 
`/get-started` link and seeing the "Introduction" content, I wrongly assumed 
that all worked fine and we could quickly add the link and have the quick win. 
However, it seems that the "get-started" page hasn't been revisited since 
https://github.com/apache/kafka-site/pull/269 and it has always been the case 
that's it's not linked in the top navigation menu. I wonder if there are plans 
to rework the content, @scott-confluent  any thoughts ?
   
   > So before enabling it, I think we should properly finish it. WDYT?
   
   I'd agree, thanks. In this case, I think the JIRA 
https://issues.apache.org/jira/browse/KAFKA-10883 could be edited in the way it 
reflects this sentiment i.e the page needs to be properly finished and then 
linked in the top navigation menu. WDYT? 




> Lien hypertext incorrect
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> En tant que visiteur du site web de présentation kafka,
>  [https://kafka.apache.org/] je n'ai pas accès à la page "get started"
>  ni à la page community depuis le menu principal (top right).
> L'item du menu "get-started" devrait pointer sur /get-started



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] machi1990 commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-04-21 Thread via GitHub


machi1990 commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1173855980


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
*/
   def shutdown(): Unit = {
 info("Shutting down the log cleaner.")
-cleaners.foreach(_.shutdown())
-cleaners.clear()
+try {
+  cleaners.foreach(_.shutdown())
+  cleaners.clear()
+} finally {
+  remoteMetrics()
+}
+  }
+
+  def remoteMetrics(): Unit = {
+metricsGroup.removeMetric("max-buffer-utilization-percent")
+metricsGroup.removeMetric("cleaner-recopy-percent")
+metricsGroup.removeMetric("max-clean-time-secs")
+metricsGroup.removeMetric("max-compaction-delay-secs")
+metricsGroup.removeMetric("DeadThreadCount")

Review Comment:
   Hi @divijvaidya 
   
   Just for my own understanding since I am a newbie: what happens when a 
metric is removed, concretely, what happens to the already registered value?
   
   I also have a nit suggestion of extracting the metric names onto const since 
they seem to be repeated, here and during metric declaration.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context (KIP-916)

2023-04-21 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-14652:
-
Summary: Improve MM2 logging by adding the flow information to the context 
(KIP-916)  (was: Improve MM2 logging by adding the flow information to the 
context)

> Improve MM2 logging by adding the flow information to the context (KIP-916)
> ---
>
> Key: KAFKA-14652
> URL: https://issues.apache.org/jira/browse/KAFKA-14652
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> MirrorMaker2 runs multiple Connect worker instances in a single process. In 
> Connect, the logging is based on the assumption that Connector names are 
> unique. But in MM2, the same Connector names are being used in each flow 
> (Connect group). This means that there is no way to differentiate between the 
> logs of MirrorSourceConnector in A->B and in B->A.
> This can be improved by adding the flow to the logging context and the names 
> of the Connect framework threads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10883) Lien hypertext incorrect

2023-04-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715040#comment-17715040
 ] 

ASF GitHub Bot commented on KAFKA-10883:


mimaison commented on PR #504:
URL: https://github.com/apache/kafka-site/pull/504#issuecomment-1517955010

   Yes feel free to edit the JIRA. I've granted you permissions so you should 
be able to self assign tickets if you want.




> Lien hypertext incorrect
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> En tant que visiteur du site web de présentation kafka,
>  [https://kafka.apache.org/] je n'ai pas accès à la page "get started"
>  ni à la page community depuis le menu principal (top right).
> L'item du menu "get-started" devrait pointer sur /get-started



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14928) Metrics collection contends on lock with log cleaning

2023-04-21 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715041#comment-17715041
 ] 

Alexandre Dupriez commented on KAFKA-14928:
---

Hi Divij, thanks for reporting this. Would you have a reproduction case which 
demonstrates the contention?

> Metrics collection contends on lock with log cleaning
> -
>
> Key: KAFKA-14928
> URL: https://issues.apache.org/jira/browse/KAFKA-14928
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
> Fix For: 3.6.0
>
>
> In LogCleanerManager.scala, calculation of a metric requires a lock [1]. This 
> same lock is required by core log cleaner functionality such as 
> "grabFilthiestCompactedLog". This might lead to a situation where metric 
> calculation holding the lock for an extended period of time may affect the 
> core functionality of log cleaning.
> This outcome of this task is to prevent expensive metric calculation from 
> blocking log cleaning/compaction activity.
> [1] 
> https://github.com/apache/kafka/blob/dd63d88ac3ea7a9a55a6dacf9c5473e939322a55/core/src/main/scala/kafka/log/LogCleanerManager.scala#L102



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14928) Metrics collection contends on lock with log cleaning

2023-04-21 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715041#comment-17715041
 ] 

Alexandre Dupriez edited comment on KAFKA-14928 at 4/21/23 2:55 PM:


Hi Divij, thanks for reporting this. Would you have a reproduction case which 
exhibits the contention?


was (Author: adupriez):
Hi Divij, thanks for reporting this. Would you have a reproduction case which 
demonstrates the contention?

> Metrics collection contends on lock with log cleaning
> -
>
> Key: KAFKA-14928
> URL: https://issues.apache.org/jira/browse/KAFKA-14928
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
> Fix For: 3.6.0
>
>
> In LogCleanerManager.scala, calculation of a metric requires a lock [1]. This 
> same lock is required by core log cleaner functionality such as 
> "grabFilthiestCompactedLog". This might lead to a situation where metric 
> calculation holding the lock for an extended period of time may affect the 
> core functionality of log cleaning.
> This outcome of this task is to prevent expensive metric calculation from 
> blocking log cleaning/compaction activity.
> [1] 
> https://github.com/apache/kafka/blob/dd63d88ac3ea7a9a55a6dacf9c5473e939322a55/core/src/main/scala/kafka/log/LogCleanerManager.scala#L102



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12826) Remove Deprecated Class Serdes (Streams)

2023-04-21 Thread shiqin.lan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715044#comment-17715044
 ] 

shiqin.lan commented on KAFKA-12826:


Class org.apache.kafka.streams.scala.Serdes was deprecated in version 2.7, 
there is no other Class references to it, so in the new version can be removed. 
Will you let me finish this task

> Remove Deprecated Class Serdes (Streams)
> 
>
> Key: KAFKA-12826
> URL: https://issues.apache.org/jira/browse/KAFKA-12826
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Class org.apache.kafka.streams.scala.Serdes was deprecated in version 2.7
> See KAFKA-10020 and KIP-616



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac merged pull request #13599: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, P…

2023-04-21 Thread via GitHub


dajac merged PR #13599:
URL: https://github.com/apache/kafka/pull/13599


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12826) Remove Deprecated Class Serdes (Streams)

2023-04-21 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715047#comment-17715047
 ] 

Josep Prat commented on KAFKA-12826:


Unfortunately, this task can't be taken just yet. It can only be done once we 
are working on  Kafka 4.0.0. Right now, trunk holds what will become 3.6.0.

So I'm afraid you'll need to wait a bit longer.

> Remove Deprecated Class Serdes (Streams)
> 
>
> Key: KAFKA-12826
> URL: https://issues.apache.org/jira/browse/KAFKA-12826
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Class org.apache.kafka.streams.scala.Serdes was deprecated in version 2.7
> See KAFKA-10020 and KIP-616



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13603: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-21 Thread via GitHub


dajac commented on PR #13603:
URL: https://github.com/apache/kafka/pull/13603#issuecomment-1517975471

   @jeffkbkim Could you please rebase this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-10883) Properly finish the "get-started.html" page and link in in the top navigation menu

2023-04-21 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo updated KAFKA-10883:
--
Summary: Properly finish the "get-started.html" page and link in in the top 
navigation menu(was: Lien hypertext incorrect)

> Properly finish the "get-started.html" page and link in in the top navigation 
> menu  
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> En tant que visiteur du site web de présentation kafka,
>  [https://kafka.apache.org/] je n'ai pas accès à la page "get started"
>  ni à la page community depuis le menu principal (top right).
> L'item du menu "get-started" devrait pointer sur /get-started



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10883) Properly finish the "get-started.html" page and link in in the top navigation menu

2023-04-21 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo updated KAFKA-10883:
--
Description: 
The get-started page: [https://kafka.apache.org/get-started] currently contains 

[https://github.com/apache/kafka-site/blob/asf-site/get-started.html]


En tant que visiteur du site web de présentation kafka,
[https://kafka.apache.org/] je n'ai pas accès à la page "get started"
ni à la page community depuis le menu principal (top right).

L'item du menu "get-started" devrait pointer sur /get-started

  was:
En tant que visiteur du site web de présentation kafka,
 [https://kafka.apache.org/] je n'ai pas accès à la page "get started"
 ni à la page community depuis le menu principal (top right).

L'item du menu "get-started" devrait pointer sur /get-started


> Properly finish the "get-started.html" page and link in in the top navigation 
> menu  
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> The get-started page: [https://kafka.apache.org/get-started] currently 
> contains 
> [https://github.com/apache/kafka-site/blob/asf-site/get-started.html]
> En tant que visiteur du site web de présentation kafka,
> [https://kafka.apache.org/] je n'ai pas accès à la page "get started"
> ni à la page community depuis le menu principal (top right).
> L'item du menu "get-started" devrait pointer sur /get-started



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10883) Properly finish the "get-started.html" page and link in in the top navigation menu

2023-04-21 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo updated KAFKA-10883:
--
Description: 
The get-started page: [https://kafka.apache.org/get-started] currently contains 
non working buttons and it seems to be finished. The idea behind this Jira is 
to properly finish the get started page built from 
[https://github.com/apache/kafka-site/blob/asf-site/get-started.html]
and then to link it onto the top navigation menu in 
[https://github.com/apache/kafka-site/blob/asf-site/includes/_top.htm#L18] just 
like what has been attempted in 

En tant que visiteur du site web de présentation kafka,
[https://kafka.apache.org/] je n'ai pas accès à la page "get started"
ni à la page community depuis le menu principal (top right).

L'item du menu "get-started" devrait pointer sur /get-started

  was:
The get-started page: [https://kafka.apache.org/get-started] currently contains 

[https://github.com/apache/kafka-site/blob/asf-site/get-started.html]


En tant que visiteur du site web de présentation kafka,
[https://kafka.apache.org/] je n'ai pas accès à la page "get started"
ni à la page community depuis le menu principal (top right).

L'item du menu "get-started" devrait pointer sur /get-started


> Properly finish the "get-started.html" page and link in in the top navigation 
> menu  
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> The get-started page: [https://kafka.apache.org/get-started] currently 
> contains non working buttons and it seems to be finished. The idea behind 
> this Jira is to properly finish the get started page built from 
> [https://github.com/apache/kafka-site/blob/asf-site/get-started.html]
> and then to link it onto the top navigation menu in 
> [https://github.com/apache/kafka-site/blob/asf-site/includes/_top.htm#L18] 
> just like what has been attempted in 
> En tant que visiteur du site web de présentation kafka,
> [https://kafka.apache.org/] je n'ai pas accès à la page "get started"
> ni à la page community depuis le menu principal (top right).
> L'item du menu "get-started" devrait pointer sur /get-started



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10883) Properly finish the "get-started.html" page and link in in the top navigation menu

2023-04-21 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo updated KAFKA-10883:
--
Description: 
The get-started page: [https://kafka.apache.org/get-started] currently contains 
non working buttons and it seems to be finished. The idea behind this Jira is 
to properly finish the get started page built from 
[https://github.com/apache/kafka-site/blob/asf-site/get-started.html]
and then to link it onto the top navigation menu in 
[https://github.com/apache/kafka-site/blob/asf-site/includes/_top.htm#L18] just 
like what has been attempted in [https://github.com/apache/kafka-site/pull/504]

*Worth noting:* The current behavior at the moment is that when the user clicks 
on the GET Started page, no direction is performed to this page even though the 
link "/get-started" is there. One of the reason why the link is not branched is 
that it could be because of the page being unfinished.

  was:
The get-started page: [https://kafka.apache.org/get-started] currently contains 
non working buttons and it seems to be finished. The idea behind this Jira is 
to properly finish the get started page built from 
[https://github.com/apache/kafka-site/blob/asf-site/get-started.html]
and then to link it onto the top navigation menu in 
[https://github.com/apache/kafka-site/blob/asf-site/includes/_top.htm#L18] just 
like what has been attempted in 

En tant que visiteur du site web de présentation kafka,
[https://kafka.apache.org/] je n'ai pas accès à la page "get started"
ni à la page community depuis le menu principal (top right).

L'item du menu "get-started" devrait pointer sur /get-started


> Properly finish the "get-started.html" page and link in in the top navigation 
> menu  
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> The get-started page: [https://kafka.apache.org/get-started] currently 
> contains non working buttons and it seems to be finished. The idea behind 
> this Jira is to properly finish the get started page built from 
> [https://github.com/apache/kafka-site/blob/asf-site/get-started.html]
> and then to link it onto the top navigation menu in 
> [https://github.com/apache/kafka-site/blob/asf-site/includes/_top.htm#L18] 
> just like what has been attempted in 
> [https://github.com/apache/kafka-site/pull/504]
> *Worth noting:* The current behavior at the moment is that when the user 
> clicks on the GET Started page, no direction is performed to this page even 
> though the link "/get-started" is there. One of the reason why the link is 
> not branched is that it could be because of the page being unfinished.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rvansa commented on pull request #13619: Initial support for OpenJDK CRaC snapshotting

2023-04-21 Thread via GitHub


rvansa commented on PR #13619:
URL: https://github.com/apache/kafka/pull/13619#issuecomment-1517992031

   @Hangleton You're right, this process should be repeated; for example in the 
JDK itself we flush DNS caches before checkpoint. I was hoping that the code in 
https://github.com/apache/kafka/pull/13619/files#diff-dcc1af531d191de8da1e23ad6d878a3efc463ba4670dbcf2896295a9dacd1c18R658
 would reload the cluster view; is that not the case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10883) Properly finish the "get-started.html" page and link in in the top navigation menu

2023-04-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715054#comment-17715054
 ] 

ASF GitHub Bot commented on KAFKA-10883:


machi1990 closed pull request #504: KAFKA-10883: link to get started in the top 
navigation header
URL: https://github.com/apache/kafka-site/pull/504




> Properly finish the "get-started.html" page and link in in the top navigation 
> menu  
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> The get-started page: [https://kafka.apache.org/get-started] currently 
> contains non working buttons and it seems to be finished. The idea behind 
> this Jira is to properly finish the get started page built from 
> [https://github.com/apache/kafka-site/blob/asf-site/get-started.html]
> and then to link it onto the top navigation menu in 
> [https://github.com/apache/kafka-site/blob/asf-site/includes/_top.htm#L18] 
> just like what has been attempted in 
> [https://github.com/apache/kafka-site/pull/504]
> *Worth noting:* The current behavior at the moment is that when the user 
> clicks on the GET Started page, no direction is performed to this page even 
> though the link "/get-started" is there. One of the reason why the link is 
> not branched is that it could be because of the page being unfinished.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10883) Properly finish the "get-started.html" page and link in in the top navigation menu

2023-04-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715053#comment-17715053
 ] 

ASF GitHub Bot commented on KAFKA-10883:


machi1990 commented on PR #504:
URL: https://github.com/apache/kafka-site/pull/504#issuecomment-1517993660

   Thanks @mimaison I've edited the JIRA to capture what has been discussed 
here. I'll close this PR as it is also mentioned in the JIRA that once the page 
is redisigned/finished off it has to be linked.




> Properly finish the "get-started.html" page and link in in the top navigation 
> menu  
> 
>
> Key: KAFKA-10883
> URL: https://issues.apache.org/jira/browse/KAFKA-10883
> Project: Kafka
>  Issue Type: Bug
>  Components: website
> Environment: Desktop windows 10, chrome / firefox
>Reporter: Jimmy PANNIER
>Priority: Major
> Attachments: 2020_12_22_10_36_31_WhatsApp.png
>
>
> The get-started page: [https://kafka.apache.org/get-started] currently 
> contains non working buttons and it seems to be finished. The idea behind 
> this Jira is to properly finish the get started page built from 
> [https://github.com/apache/kafka-site/blob/asf-site/get-started.html]
> and then to link it onto the top navigation menu in 
> [https://github.com/apache/kafka-site/blob/asf-site/includes/_top.htm#L18] 
> just like what has been attempted in 
> [https://github.com/apache/kafka-site/pull/504]
> *Worth noting:* The current behavior at the moment is that when the user 
> clicks on the GET Started page, no direction is performed to this page even 
> though the link "/get-started" is there. One of the reason why the link is 
> not branched is that it could be because of the page being unfinished.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ruslankrivoshein commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-04-21 Thread via GitHub


ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1518054816

   Yes, since it's WIP and I have questions, I left original files.
   Well, I'll take a look at `ClusterTestExtensions` and will try to figure out 
more about testing there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14633) Compression optimization: Use BufferSupplier to allocate the intermediate decompressed buffer

2023-04-21 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14633:
-
Attachment: flamegraph-pr-heapalloc-after.html
flamegraph-trunk-heapalloc-before.html

> Compression optimization: Use BufferSupplier to allocate the intermediate 
> decompressed buffer
> -
>
> Key: KAFKA-14633
> URL: https://issues.apache.org/jira/browse/KAFKA-14633
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
> Fix For: 3.6.0
>
> Attachments: flamegraph-pr-heapalloc-after.html, 
> flamegraph-trunk-heapalloc-before.html
>
>
> Use BufferSupplier to allocate the intermediate decompressed buffer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14720) KIP-906: Tools migration guidelines

2023-04-21 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-14720:

Fix Version/s: 3.5.0

> KIP-906: Tools migration guidelines
> ---
>
> Key: KAFKA-14720
> URL: https://issues.apache.org/jira/browse/KAFKA-14720
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Federico Valeri
>Assignee: Federico Valeri
>Priority: Major
> Fix For: 3.5.0
>
>
> The tools migration effort is ongoing and being tracked in KAFKA-14525. This 
> is part of a bigger initiative to split the core module into multiple modules 
> (e.g. storage, network, security, tools), which is being tracked in 
> KAFKA-14524.
> The plan is to migrate tools and related classes in a fully compatible way 
> from kafka.tools and kafka.admin packages (core module) to 
> org.apache.kafka.tools package (tools module).
> While kicking off this activity, we identified a number of potential 
> compatibility issues:
> * Missing wrapper: some tools do not have a wrapper script. There are system 
> tests that directly refer to the tool's fully qualified class name (FQCN) and 
> expect the old package name when running on old Kafka releases. They are 
> often used for troubleshooting or automation through the kafka-run-class.sh 
> script which takes the FQCN as input parameter.
> * SPI argument: some tools have arguments for setting a custom SPI 
> implementation to be used in place of the default implementation. Any custom 
> SPI implementation depends on the old package name.
> * Broken tool: some tools do not work on supported releases.
> * Core dependency: some tools require access to non-trivial core classes that 
> should be migrated first.
> See KIP-906 for more information.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13598: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, P…

2023-04-21 Thread via GitHub


dajac commented on PR #13598:
URL: https://github.com/apache/kafka/pull/13598#issuecomment-1518069951

   I could not get a successful build because there is always an `Timeout 
waiting to lock zinc-1.3.5_2.12.15_8 compiler cache` error. However, two builds 
combines give us a good signal:
   * 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13598/6/pipeline
   * 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13598/3/pipeline


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13598: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, P…

2023-04-21 Thread via GitHub


dajac merged PR #13598:
URL: https://github.com/apache/kafka/pull/13598


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13602: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-21 Thread via GitHub


dajac commented on PR #13602:
URL: https://github.com/apache/kafka/pull/13602#issuecomment-1518070622

   @jeffkbkim Could you please rebase this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13575: KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin timeouts

2023-04-21 Thread via GitHub


gharris1727 commented on code in PR #13575:
URL: https://github.com/apache/kafka/pull/13575#discussion_r1173979163


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java:
##
@@ -38,16 +38,13 @@
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /** Customised ForwardingAdmin for testing only.
  * The class create/alter topics, partitions and ACLs in Kafka then store 
metadata in {@link FakeLocalMetadataStore}.
  * */
 
 public class FakeForwardingAdminWithLocalMetadata extends ForwardingAdmin {
 private static final Logger log = 
LoggerFactory.getLogger(FakeForwardingAdminWithLocalMetadata.class);
-private final long timeout = 1000L;

Review Comment:
   I didn't just raise the timeout because the MirrorSourceTask that makes 
these calls does not have a timeout that I could predictably meet. The task 
appears to either use `get()` or `whenComplete` on the futures returned by 
these calls.
   
   I think your critique is correct though, and I've rewritten this to use 
whenComplete() to get the results of the futures, instead of get().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-04-21 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14666:

Affects Version/s: (was: 3.5.0)

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.5.0
>
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-04-21 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14666:

Fix Version/s: 3.5.0

> MM2 should translate consumer group offsets behind replication flow
> ---
>
> Key: KAFKA-14666
> URL: https://issues.apache.org/jira/browse/KAFKA-14666
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.5.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.5.0
>
>
> MirrorMaker2 includes an offset translation feature which can translate the 
> offsets for an upstream consumer group to a corresponding downstream consumer 
> group. It does this by keeping a topic of offset-syncs to correlate upstream 
> and downstream offsets, and translates any source offsets which are ahead of 
> the replication flow.
> However, if a replication flow is closer to the end of a topic than the 
> consumer group, then the offset translation feature will refuse to translate 
> the offset for correctness reasons. This is because the MirrorCheckpointTask 
> only keeps the latest offset correlation between source and target, it does 
> not have sufficient information to translate older offsets.
> The workarounds for this issue are to:
> 1. Pause the replication flow occasionally to allow the source to get ahead 
> of MM2
> 2. Increase the offset.lag.max to delay offset syncs, increasing the window 
> for translation to happen. With the fix for KAFKA-12468, this will also 
> increase the lag of applications that are ahead of the replication flow, so 
> this is a tradeoff.
> Instead, the MirrorCheckpointTask should provide correct and best-effort 
> translation for consumer groups behind the replication flow by keeping 
> additional state, or re-reading the offset-syncs topic. This should be a 
> substantial improvement for use-cases where applications have a higher 
> latency to commit than the replication flow, or where applications are 
> reading from the earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-21 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan 
   
   > Monotonicity is an optimization - it tries to minimize re-processing after 
a failover. (Please correct me if I'm wrong, but I don't really see any other 
user stories behind it.)
   
   Yes this is correct, but I think it misses some nuance. The overall feature 
(offset translation, checkpoints, syncing offsets to the consumer group) is an 
optimization: users could just replicate their data, and start reading the 
replica from the beginning. And relevant to your earlier simplicity argument: 
it would be simpler to not offer offset translation, but would require users to 
re-deliver more data.
   
   In the same way, monotonicity is an optimization to not rewind checkpoints 
when we have already translated better ones: We could choose to not offer 
monotonicity for the simplicity but (with this specific in-memory store 
implementation) that means that if you are N offsets behind the replication 
flow, your offset could fall backward by another N offsets, doubling the data 
re-delivery. 
   
   I would also consider the user-story of someone monitoring their cluster, 
and noticing that despite consumers and replication moving forward, the 
checkpoints topic is moving _backwards_. Though no error has occurred, the 
result of the offset translation is getting worse. To someone unfamiliar with 
the internal workings of this algorithm, it looks like we're just generating 
arbitrary offsets. Additionally, causes a significant difference in semantics 
between using the raw checkpoints topic and synced consumer offsets, which I 
think is more likely to lead to confusion.
   
   > Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consistently, they might never get their 
checkpoints translated, ever.
   
   I completely agree. I think that offset translation _must_ be extended to 
offer translation of very old offsets, and the solution in this PR is just part 
of that overall solution. The limitation you pointed out is a very important 
one, as it's the core limitation that this PR is trying to solve. Before this 
PR, consumers lagging behind the latest of

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-21 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan 
   
   > Monotonicity is an optimization - it tries to minimize re-processing after 
a failover. (Please correct me if I'm wrong, but I don't really see any other 
user stories behind it.)
   
   Yes this is correct, but I think it misses some nuance. The overall feature 
(offset translation, checkpoints, syncing offsets to the consumer group) is an 
optimization: users could just replicate their data, and start reading the 
replica from the beginning. And relevant to your earlier simplicity argument: 
it would be simpler to not offer offset translation, but would require users to 
re-deliver more data.
   
   In the same way, monotonicity is an optimization to not rewind checkpoints 
when we have already translated better ones: We could choose to not offer 
monotonicity for the simplicity but (with this specific in-memory store 
implementation) that means that if you are N offsets behind the replication 
flow, your offset could fall backward by another N offsets, doubling the data 
re-delivery. 
   
   I would also consider the user-story of someone monitoring their cluster, 
and noticing that despite consumers and replication moving forward, the 
checkpoints topic is moving _backwards_. Though no error has occurred, the 
result of the offset translation is getting worse. To someone unfamiliar with 
the internal workings of this algorithm, it looks like we're just generating 
arbitrary offsets. Actually we were already doing that before KAFKA-13659, and 
someone went through the effort to figure out why their offsets went backwards 
each time the connector restarted to open a ticket. Additionally, it increases 
the difference in semantics between using the raw checkpoints topic and synced 
consumer offsets, which I think is more likely to lead to confusion.
   
   > Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consistently, they might never get their 
checkpoints translated, ever.
   
   I completely agree. I think that offset translation _must_ be extended to 
offer translation of very old offsets, and the solution in this PR is just part 
of that ov

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-21 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan 
   
   > Monotonicity is an optimization - it tries to minimize re-processing after 
a failover. (Please correct me if I'm wrong, but I don't really see any other 
user stories behind it.)
   
   Yes this is correct, but I think it misses some nuance. The overall feature 
(offset translation, checkpoints, syncing offsets to the consumer group) is an 
optimization: users could just replicate their data, and start reading the 
replica from the beginning. And relevant to your earlier simplicity argument: 
it would be simpler to not offer offset translation, but would require users to 
re-deliver more data.
   
   In the same way, monotonicity is an optimization to not rewind checkpoints 
when we have already translated better ones: We could choose to not offer 
monotonicity for the simplicity but (with this specific in-memory store 
implementation) that means that if you are N offsets behind the replication 
flow, your offset could fall backward by another N offsets, doubling the data 
re-delivery. 
   
   I would also consider the user-story of someone monitoring their cluster, 
and noticing that despite consumers and replication moving forward, the 
checkpoints topic is moving _backwards_. Though no error has occurred, the 
result of the offset translation is getting worse. To someone unfamiliar with 
the internal workings of this algorithm, it looks like we're just generating 
arbitrary offsets. Actually we were already doing that before KAFKA-13659, and 
someone went through the effort to figure out why their offsets went backwards 
each time the connector restarted, and opened a ticket that multiple people 
voted on. I think you can make the argument that monotonicity is an 
optimization that we can drop for convenience, but I think users are expecting 
it based on the context of MM2 as a feature.
   
   > Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consistently, they might never get their 
checkpoints translated, ever.
   
   I completely agree. I think that offset translation _must_ be extended to 
offer translation of very old offsets, and the so

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-21 Thread via GitHub


gharris1727 commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1173998391


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @urbandan 
   
   > Monotonicity is an optimization - it tries to minimize re-processing after 
a failover. (Please correct me if I'm wrong, but I don't really see any other 
user stories behind it.)
   
   Yes this is correct, but I think it misses some nuance. The overall feature 
(offset translation, checkpoints, syncing offsets to the consumer group) is an 
optimization: users could just replicate their data, and start reading the 
replica from the beginning. And relevant to your earlier simplicity argument: 
it would be simpler to not offer offset translation, but would require users to 
re-deliver more data.
   
   In the same way, monotonicity is an optimization to not rewind checkpoints 
when we have already translated better ones: We could choose to not offer 
monotonicity for the simplicity but (with this specific in-memory store 
implementation) that means that if you are N offsets behind the replication 
flow, your offset could fall backward by another N offsets, doubling the data 
re-delivery. 
   
   I would also consider the user-story of someone monitoring their cluster, 
and noticing that despite consumers and replication moving forward, the 
checkpoints topic is moving _backwards_. Though no error has occurred, the 
result of the offset translation is getting worse. To someone unfamiliar with 
the internal workings of this algorithm, it looks like we're just generating 
arbitrary offsets. Actually we were already doing that before KAFKA-13659, and 
someone went through the effort to figure out why their offsets went backwards 
each time the connector restarted, and opened a ticket that multiple people 
voted on. I think you can make the argument that monotonicity is an 
optimization that we can drop for convenience, but that doesn't change the fact 
that users are expecting it without an explicit contract.
   
   > Being able to checkpoint old offsets after a restart is a feature, and 
probably a good one. If cluster restarts/rebalances are frequent enough, and 
some consumers are lagging behind consistently, they might never get their 
checkpoints translated, ever.
   
   I completely agree. I think that offset translation _must_ be extended to 
offer translation of very old offse

[GitHub] [kafka] szalapski commented on pull request #5876: KAFKA-7509: Avoid passing most non-applicable properties to producer, consumer, and admin client

2023-04-21 Thread via GitHub


szalapski commented on PR #5876:
URL: https://github.com/apache/kafka/pull/5876#issuecomment-1518174551

   @rhauch Any idea how to move this along?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on a diff in pull request #13575: KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin timeouts

2023-04-21 Thread via GitHub


jlprat commented on code in PR #13575:
URL: https://github.com/apache/kafka/pull/13575#discussion_r1174097520


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java:
##
@@ -38,16 +38,13 @@
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /** Customised ForwardingAdmin for testing only.
  * The class create/alter topics, partitions and ACLs in Kafka then store 
metadata in {@link FakeLocalMetadataStore}.
  * */
 
 public class FakeForwardingAdminWithLocalMetadata extends ForwardingAdmin {
 private static final Logger log = 
LoggerFactory.getLogger(FakeForwardingAdminWithLocalMetadata.class);
-private final long timeout = 1000L;

Review Comment:
   I like the `whenComplete` constructs, I think they are more elegant as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13575: KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin timeouts

2023-04-21 Thread via GitHub


gharris1727 commented on PR #13575:
URL: https://github.com/apache/kafka/pull/13575#issuecomment-1518264915

   hmm some build failures from `rat` like in #13315 but I don't think this PR 
is related. And the JDK17 build failed due to streams upgrade tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14722) Make BooleanSerde public

2023-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715157#comment-17715157
 ] 

Matthias J. Sax commented on KAFKA-14722:
-

I did a PR: [https://github.com/apache/kafka/pull/13577] – Just merged it.

> Make BooleanSerde public
> 
>
> Key: KAFKA-14722
> URL: https://issues.apache.org/jira/browse/KAFKA-14722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Spacrocket
>Priority: Minor
>  Labels: beginner, kip, newbie
> Fix For: 3.5.0
>
>
> KIP-907: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface]
>  
> We introduce a "BooleanSerde" via 
> [https://github.com/apache/kafka/pull/13249] as internal class. We could make 
> it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax merged pull request #13577: MINOR: updated KS release notes for 3.5

2023-04-21 Thread via GitHub


mjsax merged PR #13577:
URL: https://github.com/apache/kafka/pull/13577


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-21 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715158#comment-17715158
 ] 

Greg Harris commented on KAFKA-14876:
-

I think the stop API is relevant to document, even without the alter/reset API.

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  * *GET* /connectors/\{connector}/offsets
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on pull request #13577: MINOR: updated KS release notes for 3.5

2023-04-21 Thread via GitHub


mjsax commented on PR #13577:
URL: https://github.com/apache/kafka/pull/13577#issuecomment-1518267444

   Merged to `trunk` and cherry-picked to `3.5`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #13575: KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin timeouts

2023-04-21 Thread via GitHub


jlprat commented on PR #13575:
URL: https://github.com/apache/kafka/pull/13575#issuecomment-1518270904

   Yes, I was looking at the `rat` as well, one but I fail to see how they 
would be caused in any way by the changes you made.
   I seee `MirrorConnectorsWithCustomForwardingAdminIntegrationTest` is now not 
failing, so I'll merge this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat merged pull request #13575: KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin timeouts

2023-04-21 Thread via GitHub


jlprat merged PR #13575:
URL: https://github.com/apache/kafka/pull/13575


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #13551: MINOR: Allow tagged fields with version subset of flexible version range

2023-04-21 Thread via GitHub


hachikuji commented on PR #13551:
URL: https://github.com/apache/kafka/pull/13551#issuecomment-1518292028

   @cmccabe I think that case is already covered here: 
https://github.com/apache/kafka/blob/trunk/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java#L425.
 The case that is not covered is when the tagged range is a proper subset of 
the flexible version range.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14905) Failing tests in MM2 ForwardingAdmin test since KIP-894

2023-04-21 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat updated KAFKA-14905:
---
Fix Version/s: 3.4.1
   3.6.0

> Failing tests in MM2 ForwardingAdmin test since KIP-894
> ---
>
> Key: KAFKA-14905
> URL: https://issues.apache.org/jira/browse/KAFKA-14905
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.5.0, 3.4.1, 3.6.0
>
>
> There are three tests which are consistently failing in 
> MirrorConnectorsWithCustomForwardingAdminIntegrationTest since the merge of 
> KIP-894 in KAFKA-14420:
>  * testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
>  * testCreatePartitionsUseProvidedForwardingAdmin()
>  * testSyncTopicConfigUseProvidedForwardingAdmin()
> {noformat}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Topic: primary.test-topic-1 didn't get created in the FakeLocalMetadataStore 
> ==> expected:  but was: 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>   at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>   at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:214)
> (similar)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:243)
> (similar)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324)
>   at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:280){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup

2023-04-21 Thread via GitHub


hachikuji commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1169316955


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -250,6 +257,7 @@ public void commitAsync(OffsetCommitCallback callback) {
 
 @Override
 public void commitAsync(Map offsets, 
OffsetCommitCallback callback) {
+maybeWakeup();

Review Comment:
   Why are we adding this? I don't think wakeup is expected to be thrown from 
async APIs?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupableFuture.java:
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.WakeupException;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Can be interrupted by calling {@link #wakeup()}.
+ */
+public class WakeupableFuture extends CompletableFuture {

Review Comment:
   This class feels like overkill. The direct call to `completeExceptionally` 
seems clear already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14929) Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure

2023-04-21 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14929:
---

 Summary: Flaky 
KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure
 Key: KAFKA-14929
 URL: https://issues.apache.org/jira/browse/KAFKA-14929
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Greg Harris
 Fix For: 3.5.0


This test recently started flaky-failing with the following stack trace:
{noformat}
org.mockito.exceptions.verification.TooFewActualInvocations: 
kafkaBasedLog.send(, , );Wanted 2 times:-> at 
org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)But was 
1 time:-> at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315)
   at 
app//org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)   
 at 
app//org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure(KafkaStatusBackingStoreFormatTest.java:219)
at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method) at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at 
java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup

2023-04-21 Thread via GitHub


philipnee commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1174174040


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -250,6 +257,7 @@ public void commitAsync(OffsetCommitCallback callback) {
 
 @Override
 public void commitAsync(Map offsets, 
OffsetCommitCallback callback) {
+maybeWakeup();

Review Comment:
   I think you are right, it is not expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14929) Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure

2023-04-21 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14929:

Description: 
This test recently started flaky-failing with the following stack trace:
{noformat}
org.mockito.exceptions.verification.TooFewActualInvocations: 
kafkaBasedLog.send(, , );
Wanted 2 times:->
 at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
But was 1 time:->
 at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315)
at 
app//org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
at 
app//org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure(KafkaStatusBackingStoreFormatTest.java:219)
at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
...{noformat}

  was:
This test recently started flaky-failing with the following stack trace:
{noformat}
org.mockito.exceptions.verification.TooFewActualInvocations: 
kafkaBasedLog.send(, , );Wanted 2 times:-> at 
org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)But was 
1 time:-> at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315)
   at 
app//org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)   
 at 
app//org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure(KafkaStatusBackingStoreFormatTest.java:219)
at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method) at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at 
java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566){noformat}


> Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure
> -
>
> Key: KAFKA-14929
> URL: https://issues.apache.org/jira/browse/KAFKA-14929
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Major
> Fix For: 3.5.0
>
>
> This test recently started flaky-failing with the following stack trace:
> {noformat}
> org.mockito.exceptions.verification.TooFewActualInvocations: 
> kafkaBasedLog.send(, , );
> Wanted 2 times:->
>  at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
> But was 1 time:->
>  at 
> org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315)
>   at 
> app//org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
>   at 
> app//org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure(KafkaStatusBackingStoreFormatTest.java:219)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
> ...{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14929) Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure

2023-04-21 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14929:

Labels: flaky-test  (was: )

> Flaky KafkaStatusBackingStoreFormatTest#putTopicStateRetriableFailure
> -
>
> Key: KAFKA-14929
> URL: https://issues.apache.org/jira/browse/KAFKA-14929
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
> This test recently started flaky-failing with the following stack trace:
> {noformat}
> org.mockito.exceptions.verification.TooFewActualInvocations: 
> kafkaBasedLog.send(, , );
> Wanted 2 times:->
>  at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
> But was 1 time:->
>  at 
> org.apache.kafka.connect.storage.KafkaStatusBackingStore.sendTopicStatus(KafkaStatusBackingStore.java:315)
>   at 
> app//org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:376)
>   at 
> app//org.apache.kafka.connect.storage.KafkaStatusBackingStoreFormatTest.putTopicStateRetriableFailure(KafkaStatusBackingStoreFormatTest.java:219)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
> ...{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #13557: KAFKA-14902: KafkaStatusBackingStore retries on a dedicated backgroun…

2023-04-21 Thread via GitHub


gharris1727 commented on PR #13557:
URL: https://github.com/apache/kafka/pull/13557#issuecomment-1518351859

   Looks like this might have caused some flaky failures on trunk: 
https://issues.apache.org/jira/browse/KAFKA-14929


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8115) Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated

2023-04-21 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715177#comment-17715177
 ] 

Greg Harris commented on KAFKA-8115:


We merged an attempted fix for this, but it appears that timeouts are still 
appearing causing the test to flake out. I'll need to investigate further.

> Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated
> ---
>
> Key: KAFKA-8115
> URL: https://issues.apache.org/jira/browse/KAFKA-8115
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Greg Harris
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3254/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/testTaskRequestWithOldStartMsGetsUpdated/]
> {quote}org.junit.runners.model.TestTimedOutException: test timed out after 
> 12 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native 
> Method) at 
> java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
>  at 
> java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
>  at 
> java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
>  at 
> java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
>  at 
> app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:157)
>  at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) 
> at 
> app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285)
>  at 
> app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596)
>  at 
> java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@11.0.1/java.lang.reflect.Method.invoke(Method.java:566) at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>  at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>  at java.base@11.0.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
> at java.base@11.0.1/java.lang.Thread.run(Thread.java:834){quote}
> STDOUT
> {quote}[2019-03-15 09:23:41,364] INFO Creating MiniTrogdorCluster with 
> agents: node02 and coordinator: node01 
> (org.apache.kafka.trogdor.common.MiniTrogdorCluster:135) [2019-03-15 
> 09:23:41,595] INFO Logging initialized @13340ms to 
> org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193) 
> [2019-03-15 09:23:41,752] INFO Starting REST server 
> (org.apache.kafka.trogdor.rest.JsonRestServer:89) [2019-03-15 09:23:41,912] 
> INFO Registered resource 
> org.apache.kafka.trogdor.agent.AgentRestResource@3fa38ceb 
> (org.apache.kafka.trogdor.rest.JsonRestServer:94) [2019-03-15 09:23:42,178] 
> INFO jetty-9.4.14.v20181114; built: 2018-11-14T21:20:31.478Z; git: 
> c4550056e785fb5665914545889f21dc136ad9e6; jvm 11.0.1+13-LTS 
> (org.eclipse.jetty.server.Server:370) [2019-03-15 09:23:42,360] INFO 
> DefaultSessionIdManager workerName=node0 
> (org.eclipse.jetty.server.session:365) [2019-03-15 09:23:42,362] INFO No 
> SessionScavenger set, using defaults (org.eclipse.jetty.server.session:370) 
> [2019-03-15 09:23:42,370] INFO node0 Scavenging every 66ms 
> (org.eclipse.jetty.server.session:149) [2019-03-15 09:23:44,412] INFO Started 
> o.e.j.s.ServletContextHandler@335a5293\{/,null,AVAILABLE} 
> (org.eclipse.jetty.server.handler.ContextHandler:855) [2019-03-15 
> 09:23:44,473] INFO Started 
> ServerConnector@79a93bf1\{HTTP/1.1,[http/1.1]}{0.0.0.0:33477} 
> (org.eclipse.jetty.server.AbstractConnector:292) [2019-03-15 09:23:44,474] 
> INFO Started @16219ms (org.eclipse.jetty.server.Server:407) [2019-03-15 
> 09:23:44,475] INFO REST server listening at [http://127.0.1.1:33477/] 
> (org.apache.kafka.trogdor.rest.JsonRestServer:123) [2019-03-

[GitHub] [kafka] bmscomp opened a new pull request, #13625: MINOR: Upgrade gradle wrapper to the last stable version 8.1.1

2023-04-21 Thread via GitHub


bmscomp opened a new pull request, #13625:
URL: https://github.com/apache/kafka/pull/13625

   Upgrade Gradle wrapper to the latest stable version 8.1.1 and update the 
value of  property name `distributionSha256Sum`
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14892) Harmonize package names in storage module

2023-04-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715193#comment-17715193
 ] 

Jun Rao commented on KAFKA-14892:
-

Sounds good to me. Including storage in the package name probably helps avoid 
the split package issue.

> Harmonize package names in storage module
> -
>
> Key: KAFKA-14892
> URL: https://issues.apache.org/jira/browse/KAFKA-14892
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>
> We currently have:
>  # org.apache.kafka.server.log.remote.storage: public api in storage-api 
> module
>  # org.apache.kafka.server.log.remote: private api in storage module
>  # org.apache.kafka.storage.internals.log: private api in storage module
> A way to make this consistent could be:
>  # org.apache.kafka.storage.* or org.apache.kafka.storage.api.*: public api 
> in storage-api module
>  # org.apache.kafka.storage.internals.log.remote: private api in storage 
> module
>  # org.apache.kafka.storage.internals.log: private api in storage module 
> (stays the same)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14925) The website shouldn't load external resources

2023-04-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715204#comment-17715204
 ] 

ASF GitHub Bot commented on KAFKA-14925:


atu-sharm opened a new pull request, #506:
URL: https://github.com/apache/kafka-site/pull/506

   Adding one JS file which is recommended to be hosted along with the website 
so that we don't have a dependency on external resources




> The website shouldn't load external resources
> -
>
> Key: KAFKA-14925
> URL: https://issues.apache.org/jira/browse/KAFKA-14925
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Mickael Maison
>Assignee: Atul Sharma
>Priority: Major
>
> In includes/_header.htm, we load a resource from fontawesome.com



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14925) The website shouldn't load external resources

2023-04-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715205#comment-17715205
 ] 

ASF GitHub Bot commented on KAFKA-14925:


atu-sharm commented on PR #506:
URL: https://github.com/apache/kafka-site/pull/506#issuecomment-1518434152

   @mimaison can you review please




> The website shouldn't load external resources
> -
>
> Key: KAFKA-14925
> URL: https://issues.apache.org/jira/browse/KAFKA-14925
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Mickael Maison
>Assignee: Atul Sharma
>Priority: Major
>
> In includes/_header.htm, we load a resource from fontawesome.com



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-21 Thread via GitHub


artemlivshits commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1174283192


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-  // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-  if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-if (block == null) {
-  // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
-  // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
-} else {
-  block match {
-case Success(nextBlock) =>
-  currentProducerIdBlock = nextBlock
-  nextProducerId = currentProducerIdBlock.firstProducerId
-case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+var result: Try[Long] = null
+while (result == null) {

Review Comment:
   I actually wonder if we need to loop at all, seems like we only need to loop 
when we hit the branch
   
   ```
  currentProducerIdBlock.set(block)
  requestInFlight.set(false)
   ```
   I think we could do this instead:
   ```
  result = Success(block.claimNextId().asScala.get)
  currentProducerIdBlock.set(block)
  requestInFlight.set(false)
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14920) Address timeouts and out of order sequences

2023-04-21 Thread Artem Livshits (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715224#comment-17715224
 ] 

Artem Livshits commented on KAFKA-14920:


I wonder if we could just add producer state before checking for transaction so 
that the second (and consequent) batch would fail as well.

> Address timeouts and out of order sequences
> ---
>
> Key: KAFKA-14920
> URL: https://issues.apache.org/jira/browse/KAFKA-14920
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> KAFKA-14844 showed the destructive nature of a timeout on the first produce 
> request for a topic partition (ie one that has no state in psm)
> Since we currently don't validate the first sequence (we will in part 2 of 
> kip-890), any transient error on the first produce can lead to out of order 
> sequences that never recover.
> Originally, KAFKA-14561 relied on the producer's retry mechanism for these 
> transient issues, but until that is fixed, we may need to retry from in the 
> AddPartitionsManager instead. We addressed the concurrent transactions, but 
> there are other errors like coordinator loading that we could run into and 
> see increased out of order issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13571: KAFKA-14907:Add the traffic metric of the partition dimension in BrokerTopicStats

2023-04-21 Thread via GitHub


dajac commented on PR #13571:
URL: https://github.com/apache/kafka/pull/13571#issuecomment-1518511048

   > Does this PR need a KIP process? Anyone take a look?
   
   Metrics are part of the public api. Changing or adding metrics require a KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14920) Address timeouts and out of order sequences

2023-04-21 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715227#comment-17715227
 ] 

Justine Olshan commented on KAFKA-14920:


I originally considered this, but we can't do it because we append the relevant 
information when the records are appended to the log. If we do it earlier, we 
break this logic. (Logic that is also persisted to disk and reloaded.) The 
information is store in the records themselves, and I don't know if there is a 
great way to handle this. How do we know when the record is actually appended 
vs verifying? Unless we add another in memory state?

This is why we added a state machine in kafka-14904.

> Address timeouts and out of order sequences
> ---
>
> Key: KAFKA-14920
> URL: https://issues.apache.org/jira/browse/KAFKA-14920
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> KAFKA-14844 showed the destructive nature of a timeout on the first produce 
> request for a topic partition (ie one that has no state in psm)
> Since we currently don't validate the first sequence (we will in part 2 of 
> kip-890), any transient error on the first produce can lead to out of order 
> sequences that never recover.
> Originally, KAFKA-14561 relied on the producer's retry mechanism for these 
> transient issues, but until that is fixed, we may need to retry from in the 
> AddPartitionsManager instead. We addressed the concurrent transactions, but 
> there are other errors like coordinator loading that we could run into and 
> see increased out of order issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)