[GitHub] [kafka] dengziming commented on pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions

2022-05-05 Thread GitBox


dengziming commented on PR #12112:
URL: https://github.com/apache/kafka/pull/12112#issuecomment-1119266873

   @showuon I have updated it, though @divijvaidya provided a good suggestion, 
we still have a more generic solution for this kind of flakiness. [see details 
here](https://github.com/apache/kafka/pull/12108#discussion_r865373299)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions

2022-05-05 Thread GitBox


dengziming commented on code in PR #12112:
URL: https://github.com/apache/kafka/pull/12112#discussion_r866494535


##
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala:
##
@@ -586,11 +586,14 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 try {
   killBroker(0)
   val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-  TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
+  TestUtils.waitUntilTrue(

Review Comment:
   @divijvaidya I really appreciate your suggestion, however, I need to clarify 
something here: 
   1. We should use `TestUtils.waitForBrokersOutOfIsr` not 
`TestUtils.waitForBrokersInIsr`
   2. `TestUtils.waitForBrokersInIsr` needs to send RPC but we can directly use 
MetadataCache which is in the memory, so it's faster and more reliable, maybe 
we can improve `TestUtils.waitForBrokersInIsr`  in the future
   3. This is most important, in another PR we found we can use a more generic 
way in KRaft mode, which waits until the brokers have caught up to the 
controller metadata topic end offset, instead of waiting for something really 
specific.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…

2022-05-05 Thread GitBox


vamossagar12 commented on code in PR #12104:
URL: https://github.com/apache/kafka/pull/12104#discussion_r866489041


##
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala:
##
@@ -733,12 +733,18 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   killBroker(0)
   val aliveServers = brokers.filterNot(_.config.brokerId == 0)
   TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0)
-  val output = TestUtils.grabConsoleOutput(
-topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"
+  var output = ""
+  TestUtils.waitUntilTrue(
+() => {
+  output = TestUtils.grabConsoleOutput(
+topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"

Review Comment:
   @dengziming , looks like the latest build failed with a timeout =>
   ```
   org.opentest4j.AssertionFailedError: Timeout waiting for partition metadata 
propagating to brokers for underMinIsrTopic topic
   ```
   Sure I will take a look at that..
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…

2022-05-05 Thread GitBox


vamossagar12 commented on PR #12104:
URL: https://github.com/apache/kafka/pull/12104#issuecomment-1119258638

   > Details
   
   @guozhangwang , Thanks for pointing it out. I will take a look at this and 
make another commit which should re-trigger the build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 closed pull request #12123: KAFKA-13830 Introduce metadata.version for KRaft

2022-05-05 Thread GitBox


ahuang98 closed pull request #12123: KAFKA-13830 Introduce metadata.version for 
KRaft
URL: https://github.com/apache/kafka/pull/12123


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions

2022-05-05 Thread GitBox


showuon commented on PR #12112:
URL: https://github.com/apache/kafka/pull/12112#issuecomment-1119226457

   @dengziming , it's fine, we can improve the tests anyway. I agree your fix 
is better. Let's address @divijvaidya 's comment. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13854) Refactor ApiVersion to MetadataVersion

2022-05-05 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-13854.

Resolution: Fixed

> Refactor ApiVersion to MetadataVersion
> --
>
> Key: KAFKA-13854
> URL: https://issues.apache.org/jira/browse/KAFKA-13854
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Arthur
>Assignee: Alyssa Huang
>Priority: Major
>
> In KRaft, we will have a value for {{metadata.version}} corresponding to each 
> IBP. In order to keep this association and make it obvious for developers, we 
> will consolidate the IBP and metadata version into a new MetadataVersion 
> enum. This new enum will replace the existing ApiVersion trait. 
> For IBPs that precede the first KRaft preview version (AK 3.0), we will use a 
> value of -1 for the metadata.version. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13880) DefaultStreamPartitioner may get "stuck" to one partition for unkeyed messages

2022-05-05 Thread Artem Livshits (Jira)
Artem Livshits created KAFKA-13880:
--

 Summary: DefaultStreamPartitioner may get "stuck" to one partition 
for unkeyed messages
 Key: KAFKA-13880
 URL: https://issues.apache.org/jira/browse/KAFKA-13880
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.0
Reporter: Artem Livshits


While working on KIP-794, I noticed that DefaultStreamPartitioner does not call 
.onNewBatch.  The "sticky" DefaultStreamPartitioner introduced as a result of 
https://issues.apache.org/jira/browse/KAFKA-8601 requires .onNewBatch call in 
order to switch to a new partitions for unkeyed messages, just calling 
.partition would return the same "sticky" partition chosen during the first 
call to .partition.  The partition doesn't change even if the partition leader 
is unavailable.

Ideally, for unkeyed messages the DefaultStreamPartitioner should take 
advantage of the new built-in partitioning logic introduced in 
[https://github.com/apache/kafka/pull/12049.]  Perhaps, it could return null 
partition for unkeyed message, so that KafkaProducer could run built-in 
partitioning logic.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

2022-05-05 Thread GitBox


artemlivshits commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866402725


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1403,25 +1452,54 @@ public boolean isDone() {
 }
 
 /**
- * A callback called when producer request is complete. It in turn calls 
user-supplied callback (if given) and
- * notifies producer interceptors about the request completion.
+ * Callbacks that are called by the RecordAccumulator append functions:
+ *  - user callback
+ *  - interceptor callbacks
+ *  - partition callback
  */
-private static class InterceptorCallback implements Callback {
+private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
 private final Callback userCallback;
 private final ProducerInterceptors interceptors;
-private final TopicPartition tp;
+private final ProducerRecord record;
+protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
-private InterceptorCallback(Callback userCallback, 
ProducerInterceptors interceptors, TopicPartition tp) {
+private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) {
 this.userCallback = userCallback;
 this.interceptors = interceptors;
-this.tp = tp;
+this.record = record;
 }
 
+@Override
 public void onCompletion(RecordMetadata metadata, Exception exception) 
{
-metadata = metadata != null ? metadata : new RecordMetadata(tp, 
-1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+if (metadata == null) {
+metadata = new RecordMetadata(topicPartition(), -1, -1, 
RecordBatch.NO_TIMESTAMP, -1, -1);
+}
 this.interceptors.onAcknowledgement(metadata, exception);
 if (this.userCallback != null)
 this.userCallback.onCompletion(metadata, exception);
 }
+
+@Override
+public void setPartition(int partition) {
+assert partition != RecordMetadata.UNKNOWN_PARTITION;
+this.partition = partition;
+
+if (log.isTraceEnabled()) {
+// Log the message here, because we don't know the partition 
before that.
+log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+}
+}
+
+public int getPartition() {
+return partition;
+}
+
+public TopicPartition topicPartition() {
+if (record == null)

Review Comment:
   This is an existing test 
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L1041
   It passes in null and expects that the error goes through the interceptor's 
callback, but topicPartition() is called to supply an argument to the callback 
(which can handle null topicPartition) and it was throwing NPE.
   I personally would just declare record as non-null and treat it as a bug in 
the application (undefined behavior), but as defined currently, it's a tested 
run-time error condition with some expected behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YiDing-Duke commented on a diff in pull request #9874: KAFKA-7340: Migrate clients module to JUnit 5

2022-05-05 Thread GitBox


YiDing-Duke commented on code in PR #9874:
URL: https://github.com/apache/kafka/pull/9874#discussion_r866399744


##
clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java:
##
@@ -228,12 +213,8 @@ private NioEchoServer createEchoServer(SecurityProtocol 
securityProtocol) throws
 }
 
 private NioEchoServer createEchoServer(ListenerName listenerName, 
SecurityProtocol securityProtocol) throws Exception {
-if (failedAuthenticationDelayMs != -1)

Review Comment:
   It's not dead code, it sets the delay in Selector level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-05 Thread GitBox


rittikaadhikari commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r866395929


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -51,6 +51,7 @@ import scala.math._
  */
 abstract class AbstractFetcherThread(name: String,
  clientId: String,
+ val leader: LeaderEndPoint,

Review Comment:
   We could end up having something like the following: 
   
   ```
   trait LeaderEndPoint {
 def brokerEndPoint(): BrokerEndPoint
 
   }
   
   class LocalLeaderEndPoint(sourceBroker: brokerEndPoint, ) extends 
LeaderEndPoint {
   def brokerEndPoint(): return sourceBroker
   
   class RemoteLeaderEndPoint(endpoint: BlockingSend, ...) extends 
LeaderEndPoint {
   def brokerEndPoint(): return endpoint.sourceBroker
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

2022-05-05 Thread GitBox


junrao commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866393904


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1403,25 +1452,54 @@ public boolean isDone() {
 }
 
 /**
- * A callback called when producer request is complete. It in turn calls 
user-supplied callback (if given) and
- * notifies producer interceptors about the request completion.
+ * Callbacks that are called by the RecordAccumulator append functions:
+ *  - user callback
+ *  - interceptor callbacks
+ *  - partition callback
  */
-private static class InterceptorCallback implements Callback {
+private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
 private final Callback userCallback;
 private final ProducerInterceptors interceptors;
-private final TopicPartition tp;
+private final ProducerRecord record;
+protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
-private InterceptorCallback(Callback userCallback, 
ProducerInterceptors interceptors, TopicPartition tp) {
+private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) {
 this.userCallback = userCallback;
 this.interceptors = interceptors;
-this.tp = tp;
+this.record = record;
 }
 
+@Override
 public void onCompletion(RecordMetadata metadata, Exception exception) 
{
-metadata = metadata != null ? metadata : new RecordMetadata(tp, 
-1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+if (metadata == null) {
+metadata = new RecordMetadata(topicPartition(), -1, -1, 
RecordBatch.NO_TIMESTAMP, -1, -1);
+}
 this.interceptors.onAcknowledgement(metadata, exception);
 if (this.userCallback != null)
 this.userCallback.onCompletion(metadata, exception);
 }
+
+@Override
+public void setPartition(int partition) {
+assert partition != RecordMetadata.UNKNOWN_PARTITION;
+this.partition = partition;
+
+if (log.isTraceEnabled()) {
+// Log the message here, because we don't know the partition 
before that.
+log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+}
+}
+
+public int getPartition() {
+return partition;
+}
+
+public TopicPartition topicPartition() {
+if (record == null)

Review Comment:
   Hmm, by returning a null, we are breaking the contract that 
RecordMetadata.topic() and RecordMetadata.partition() is never null in producer 
callback and interceptor. Is that a new or existing test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13879) Exponential backoff for reconnect does not work

2022-05-05 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-13879:
-

Assignee: Chern Yih Cheah

> Exponential backoff for reconnect does not work
> ---
>
> Key: KAFKA-13879
> URL: https://issues.apache.org/jira/browse/KAFKA-13879
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.7.0
>Reporter: Chern Yih Cheah
>Assignee: Chern Yih Cheah
>Priority: Minor
>
> When a client connects to a SSL listener using PLAINTEXT security protocol, 
> after the TCP connection is setup, the client considers the channel setup is 
> complete (in reality the channel setup is not complete yet). The client 
> issues API version request after that. When issuing API version request, 
> reconnection exponential backoff is reset. Since the broker expects SSL 
> handshake, client's API version request will cause the connection to 
> disconnect. Reconnect will happen without exponential backoff since it has 
> been reset.
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L249.]
>   



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-05 Thread GitBox


junrao commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r866389159


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -51,6 +51,7 @@ import scala.math._
  */
 abstract class AbstractFetcherThread(name: String,
  clientId: String,
+ val leader: LeaderEndPoint,

Review Comment:
   We could. Alternatively, if we add brokerEndpoint() to LeaderEndPoint, we 
could implement it in RemoteLeaderEndPoint by getting it from the 
ReplicaFetcherBlockingSend inside RemoteLeaderEndPoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

2022-05-05 Thread GitBox


artemlivshits commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866386035


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1403,25 +1452,54 @@ public boolean isDone() {
 }
 
 /**
- * A callback called when producer request is complete. It in turn calls 
user-supplied callback (if given) and
- * notifies producer interceptors about the request completion.
+ * Callbacks that are called by the RecordAccumulator append functions:
+ *  - user callback
+ *  - interceptor callbacks
+ *  - partition callback
  */
-private static class InterceptorCallback implements Callback {
+private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
 private final Callback userCallback;
 private final ProducerInterceptors interceptors;
-private final TopicPartition tp;
+private final ProducerRecord record;
+protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
-private InterceptorCallback(Callback userCallback, 
ProducerInterceptors interceptors, TopicPartition tp) {
+private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) {
 this.userCallback = userCallback;
 this.interceptors = interceptors;
-this.tp = tp;
+this.record = record;
 }
 
+@Override
 public void onCompletion(RecordMetadata metadata, Exception exception) 
{
-metadata = metadata != null ? metadata : new RecordMetadata(tp, 
-1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+if (metadata == null) {
+metadata = new RecordMetadata(topicPartition(), -1, -1, 
RecordBatch.NO_TIMESTAMP, -1, -1);
+}
 this.interceptors.onAcknowledgement(metadata, exception);
 if (this.userCallback != null)
 this.userCallback.onCompletion(metadata, exception);
 }
+
+@Override
+public void setPartition(int partition) {
+assert partition != RecordMetadata.UNKNOWN_PARTITION;
+this.partition = partition;
+
+if (log.isTraceEnabled()) {
+// Log the message here, because we don't know the partition 
before that.
+log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+}
+}
+
+public int getPartition() {
+return partition;
+}
+
+public TopicPartition topicPartition() {
+if (record == null)

Review Comment:
   I've added it because there is a test that passes a null and expects some 
error handling.  From that perspective, adding an assert would contradict the 
expectation that there is defined and tested error handling (in my mind, an 
assert means the behavior is undefined and all bets are off).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang closed pull request #11917: KAFKA-13785: [8/N][emit final] time-ordered session store

2022-05-05 Thread GitBox


guozhangwang closed pull request #11917: KAFKA-13785: [8/N][emit final] 
time-ordered session store
URL: https://github.com/apache/kafka/pull/11917


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #12127: KAFKA-13785: [8/N][emit final] time-ordered session store

2022-05-05 Thread GitBox


guozhangwang merged PR #12127:
URL: https://github.com/apache/kafka/pull/12127


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12128: KAFKA-10199: Implement adding active tasks to the state updater

2022-05-05 Thread GitBox


guozhangwang commented on code in PR #12128:
URL: https://github.com/apache/kafka/pull/12128#discussion_r866368735


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task.State;
+import org.slf4j.Logger;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+public class DefaultStateUpdater implements StateUpdater {
+
+private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
+"Please report at https://issues.apache.org/jira/projects/KAFKA/issues 
or to the dev-mailing list (https://kafka.apache.org/contact).";
+
+private class StateUpdaterThread extends Thread {
+
+private final ChangelogReader changelogReader;
+private final AtomicBoolean isRunning = new AtomicBoolean(true);
+private final java.util.function.Consumer> 
offsetResetter;
+private final Map updatingTasks = new HashMap<>();
+private final Logger log;
+
+public StateUpdaterThread(final String name,
+  final ChangelogReader changelogReader,
+  final 
java.util.function.Consumer> offsetResetter) {
+super(name);
+this.changelogReader = changelogReader;
+this.offsetResetter = offsetResetter;
+
+final String logPrefix = String.format("%s ", name);
+final LogContext logContext = new LogContext(logPrefix);
+log = logContext.logger(DefaultStateUpdater.class);
+}
+
+public Collection getAllUpdatingTasks() {
+return updatingTasks.values();
+}
+
+@Override
+public void run() {
+try {
+while (isRunning.get()) {
+try {
+performActionsOnTasks();
+restoreTasks();
+waitIfAllChangelogsCompletelyRead();
+} catch (final InterruptedException interruptedException) {
+return;
+}
+}
+} catch (final RuntimeException anyOtherException) {
+log.error("An unexpected error occurred within the state 
updater thread: " + anyOtherException);
+final ExceptionAndTasks exceptionAndTasks = new 
ExceptionAndTasks(new HashSet<>(updatingTasks.values()), anyOtherException);
+updatingTasks.clear();
+failedTasks.add(exceptionAndTasks);
+isRunning.set(false);
+} finally {
+clear();
+}
+}
+
+private void performActionsOnTasks() throws InterruptedException {
+tasksAndActionsLock.lock();
+try {
+for (final TaskAndAction taskAndAction : getTasksAndActions()) 
{
+final Task task = taskAndAction.task;
+final Action action = taskAndAction.action;
+switch (action) {
+case ADD:
+addTask(task);
+break;
+ 

[GitHub] [kafka] guozhangwang merged pull request #12128: KAFKA-10199: Implement adding active tasks to the state updater

2022-05-05 Thread GitBox


guozhangwang merged PR #12128:
URL: https://github.com/apache/kafka/pull/12128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-05 Thread GitBox


rittikaadhikari commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r866371951


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -51,6 +51,7 @@ import scala.math._
  */
 abstract class AbstractFetcherThread(name: String,
  clientId: String,
+ val leader: LeaderEndPoint,

Review Comment:
   Yes, that does make sense. Probably it would be best to pass in 
BrokerEndPoint to LeaderEndpoint and construct a ReplicaFetcherBlockingSend 
within the RemoteLeaderEndPoint. However, ReplicaFetcherBlockingSend requires 
several other parameters (like metrics, time, fetcherId, etc.), which may make 
the constructor for RemoteLeaderEndPoint look a bit cluttered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-05 Thread GitBox


junrao commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r866357178


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -51,6 +51,7 @@ import scala.math._
  */
 abstract class AbstractFetcherThread(name: String,
  clientId: String,
+ val leader: LeaderEndPoint,

Review Comment:
   Yes, we need to pass brokerEndPoint to LocalLeaderEndPoint. Intuitively, it 
seems that it makes sense for LeaderEndPoint to own brokerEndPoint instead of 
passing in two endpoints around.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] frosiere commented on pull request #12125: KAFKA-13864: provide the construct interceptor for KafkaProducer and KafkaConsumer

2022-05-05 Thread GitBox


frosiere commented on PR #12125:
URL: https://github.com/apache/kafka/pull/12125#issuecomment-1119022631

   Tests and checks are failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-05 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532518#comment-17532518
 ] 

François Rosière commented on KAFKA-13864:
--

Thanks for the PR. I added some comments.

To progress on this issue, we still miss a vote or an approval to be able to 
merge the issue once it will have been fully reviewed...

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] frosiere commented on a diff in pull request #12125: KAFKA-13864: provide the construct interceptor for KafkaProducer and KafkaConsumer

2022-05-05 Thread GitBox


frosiere commented on code in PR #12125:
URL: https://github.com/apache/kafka/pull/12125#discussion_r866287593


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -663,12 +663,38 @@ public KafkaConsumer(Properties properties,
 public KafkaConsumer(Map configs,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer) {
+this(configs, keyDeserializer, valueDeserializer, null);
+}
+
+/**
+ * A consumer is instantiated by providing a set of key-value pairs as 
configuration, and a key and a value {@link Deserializer}.
+ * 
+ * Valid configuration strings are documented at {@link ConsumerConfig}.
+ * 
+ * Note: after creating a {@code KafkaConsumer} you must always {@link 
#close()} it to avoid resource leaks.
+ *
+ * @param configs The consumer configs
+ * @param keyDeserializer The deserializer for key that implements {@link 
Deserializer}. The configure() method
+ *won't be called in the consumer when the deserializer is 
passed in directly.
+ * @param valueDeserializer The deserializer for value that implements 
{@link Deserializer}. The configure() method
+ *won't be called in the consumer when the deserializer is 
passed in directly.
+ * @param interceptors The list interceptors for consumer that implements 
{$link ConsumerInterceptor}.
+ */
+public KafkaConsumer(Map configs,

Review Comment:
   To avoid touching to the existing and offer more flexibility, I would have 
created a new constructor with a ProducerConfig to allow overriding the method 
ProducerConfig#getConfiguredInstances to complete the list of interceptors with 
instances or any other type of instances such as the MetricsReporter of the 
partitioner. See the Spring issue for a concrete example: 
https://github.com/spring-projects/spring-kafka/issues/2244 
   This approach can work as well.



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -663,12 +663,38 @@ public KafkaConsumer(Properties properties,
 public KafkaConsumer(Map configs,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer) {
+this(configs, keyDeserializer, valueDeserializer, null);
+}
+
+/**
+ * A consumer is instantiated by providing a set of key-value pairs as 
configuration, and a key and a value {@link Deserializer}.

Review Comment:
   the new parameter is not mentioned



##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -290,6 +290,27 @@ public KafkaProducer(Map configs, 
Serializer keySerializer, S
 keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
 }
 
+/**
+ * A producer is instantiated by providing a set of key-value pairs as 
configuration, a key and a value {@link Serializer}.

Review Comment:
   Same issue with the Javadoc of this constructor



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -663,12 +663,38 @@ public KafkaConsumer(Properties properties,
 public KafkaConsumer(Map configs,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer) {
+this(configs, keyDeserializer, valueDeserializer, null);
+}
+
+/**
+ * A consumer is instantiated by providing a set of key-value pairs as 
configuration, and a key and a value {@link Deserializer}.
+ * 
+ * Valid configuration strings are documented at {@link ConsumerConfig}.
+ * 
+ * Note: after creating a {@code KafkaConsumer} you must always {@link 
#close()} it to avoid resource leaks.
+ *
+ * @param configs The consumer configs
+ * @param keyDeserializer The deserializer for key that implements {@link 
Deserializer}. The configure() method
+ *won't be called in the consumer when the deserializer is 
passed in directly.
+ * @param valueDeserializer The deserializer for value that implements 
{@link Deserializer}. The configure() method
+ *won't be called in the consumer when the deserializer is 
passed in directly.
+ * @param interceptors The list interceptors for consumer that implements 
{$link ConsumerInterceptor}.

Review Comment:
   Java doc is not correct. The list of interceptors... $link should be 
replaced by @link



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12128: KAFKA-10199: Implement adding active tasks to the state updater

2022-05-05 Thread GitBox


cadonna opened a new pull request, #12128:
URL: https://github.com/apache/kafka/pull/12128

   This PR adds the default implementation of the state updater.
   The implementation only implements adding active tasks to the
   state updater.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-05-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532480#comment-17532480
 ] 

Guozhang Wang commented on KAFKA-13877:
---

Thanks [~lkokhreidze]!

> Flaky 
> RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
> 
>
> Key: KAFKA-13877
> URL: https://issues.apache.org/jira/browse/KAFKA-13877
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Levani Kokhreidze
>Priority: Major
>  Labels: newbie
>
> The following test fails on local testbeds about once per 10-15 runs:
> {code}
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:87)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at org.junit.Assert.assertTrue(Assert.java:53)
>   at 
> org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-05 Thread GitBox


rittikaadhikari commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r866208728


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -51,6 +51,7 @@ import scala.math._
  */
 abstract class AbstractFetcherThread(name: String,
  clientId: String,
+ val leader: LeaderEndPoint,

Review Comment:
   However, I don't think LocalLeaderEndPoint has any params with access to the 
brokerEndPoint, so we would need to pass it in as a param to LocalLeaderEndPoint



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-05 Thread GitBox


rittikaadhikari commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r866206202


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -51,6 +51,7 @@ import scala.math._
  */
 abstract class AbstractFetcherThread(name: String,
  clientId: String,
+ val leader: LeaderEndPoint,

Review Comment:
   Ahhh I see what you mean. Potentially, we could create a brokerEndPoint 
method in LeaderEndPoint to access that object? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13879) Exponential backoff for reconnect does not work

2022-05-05 Thread Chern Yih Cheah (Jira)
Chern Yih Cheah created KAFKA-13879:
---

 Summary: Exponential backoff for reconnect does not work
 Key: KAFKA-13879
 URL: https://issues.apache.org/jira/browse/KAFKA-13879
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 2.7.0
Reporter: Chern Yih Cheah


When a client connects to a SSL listener using PLAINTEXT security protocol, 
after the TCP connection is setup, the client considers the channel setup is 
complete (in reality the channel setup is not complete yet). The client issues 
API version request after that. When issuing API version request, reconnection 
exponential backoff is reset. Since the broker expects SSL handshake, client's 
API version request will cause the connection to disconnect. Reconnect will 
happen without exponential backoff since it has been reset.

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L249.]
  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-05 Thread GitBox


junrao commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r866200194


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -51,6 +51,7 @@ import scala.math._
  */
 abstract class AbstractFetcherThread(name: String,
  clientId: String,
+ val leader: LeaderEndPoint,

Review Comment:
   Well, RemoteLeaderEndPoint takes a ReplicaFetcherBlockingSend, which takes a 
BrokerEndPoint, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari commented on pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-05 Thread GitBox


rittikaadhikari commented on PR #12005:
URL: https://github.com/apache/kafka/pull/12005#issuecomment-1118915061

   @junrao: Thank you for the comments. Whenever you get a chance, this PR is 
ready for review again!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

2022-05-05 Thread GitBox


junrao commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866194199


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that 
is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for 
adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per 
topic.
+ */
+public class BuiltInPartitioner {
+private final Logger log;
+private final String topic;
+private final int stickyBatchSize;
+
+private volatile PartitionLoadStats partitionLoadStats = null;
+private final AtomicReference stickyPartitionInfo = 
new AtomicReference<>();
+
+// Visible and used for testing only.
+static volatile public Supplier mockRandom = null;
+
+/**
+ * BuiltInPartitioner constructor.
+ *
+ * @param topic The topic
+ * @param stickyBatchSize How much to produce to partition before switch
+ */
+public BuiltInPartitioner(LogContext logContext, String topic, int 
stickyBatchSize) {
+this.log = logContext.logger(BuiltInPartitioner.class);
+this.topic = topic;
+this.stickyBatchSize = stickyBatchSize;
+}
+
+/**
+ * Calculate the next partition for the topic based on the partition load 
stats.
+ */
+private int nextPartition(Cluster cluster) {
+int random = mockRandom != null ? mockRandom.get() : 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+// Cache volatile variable in local variable.
+PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+int partition;
+
+if (partitionLoadStats == null) {
+// We don't have stats to do adaptive partitioning (or it's 
disabled), just switch to the next
+// partition based on uniform distribution.
+List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
+if (availablePartitions.size() > 0) {
+partition = availablePartitions.get(random % 
availablePartitions.size()).partition();
+} else {
+// We don't have available partitions, just pick one among all 
partitions.
+List partitions = 
cluster.partitionsForTopic(topic);
+partition = random % partitions.size();
+}
+} else {
+// Calculate next partition based on load distribution.
+// Note that partitions without leader are excluded from the 
partitionLoadStats.
+assert partitionLoadStats.length > 0;
+
+int[] cumulativeFrequencyTable = 
partitionLoadStats.cumulativeFrequencyTable;
+int weightedRandom = random % 
cumulativeFrequencyTable[partitionLoadStats.length - 1];
+
+// By construction, the cumulative frequency table is sorted, so 
we can use binary
+// search to find the desired index.
+int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 
0, partitionLoadStats.length, weightedRandom);
+
+// binarySearch results the index of the found element, or 
-(insertion_point) - 1
+// (where insertion_point is the index of the first element 
greater than the key).
+// We need to get the index of the first value that is strictly 
greater, which
+// would be the insertion point, except if we found the element 
that's equal to
+// 

[jira] [Assigned] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-05-05 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze reassigned KAFKA-13877:
-

Assignee: Levani Kokhreidze

> Flaky 
> RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
> 
>
> Key: KAFKA-13877
> URL: https://issues.apache.org/jira/browse/KAFKA-13877
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Levani Kokhreidze
>Priority: Major
>  Labels: newbie
>
> The following test fails on local testbeds about once per 10-15 runs:
> {code}
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:87)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at org.junit.Assert.assertTrue(Assert.java:53)
>   at 
> org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-05-05 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532474#comment-17532474
 ] 

Levani Kokhreidze commented on KAFKA-13877:
---

I will take this on. 

> Flaky 
> RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
> 
>
> Key: KAFKA-13877
> URL: https://issues.apache.org/jira/browse/KAFKA-13877
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Levani Kokhreidze
>Priority: Major
>  Labels: newbie
>
> The following test fails on local testbeds about once per 10-15 runs:
> {code}
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:87)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at org.junit.Assert.assertTrue(Assert.java:53)
>   at 
> org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] junrao commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

2022-05-05 Thread GitBox


junrao commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866191785


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java:
##
@@ -25,12 +23,15 @@
 
 private final Cluster cluster;
 private final Serializer keySerializer;
-private final DefaultPartitioner defaultPartitioner;
 
+@SuppressWarnings("deprecation")
+private final 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 
defaultPartitioner;
+
+@SuppressWarnings("deprecation")
 public DefaultStreamPartitioner(final Serializer keySerializer, final 
Cluster cluster) {
 this.cluster = cluster;
 this.keySerializer = keySerializer;
-this.defaultPartitioner = new DefaultPartitioner();
+this.defaultPartitioner = new 
org.apache.kafka.clients.producer.internals.DefaultPartitioner();

Review Comment:
   @artemlivshits : Actually, you are right. Producer will only call 
onNewBatch() for the DefaultPartitioner instance created internally. So, it 
won't have effect on the producer instance created in DefaultStreamPartitioner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

2022-05-05 Thread GitBox


junrao commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866187547


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1403,25 +1452,54 @@ public boolean isDone() {
 }
 
 /**
- * A callback called when producer request is complete. It in turn calls 
user-supplied callback (if given) and
- * notifies producer interceptors about the request completion.
+ * Callbacks that are called by the RecordAccumulator append functions:
+ *  - user callback
+ *  - interceptor callbacks
+ *  - partition callback
  */
-private static class InterceptorCallback implements Callback {
+private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
 private final Callback userCallback;
 private final ProducerInterceptors interceptors;
-private final TopicPartition tp;
+private final ProducerRecord record;
+protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
-private InterceptorCallback(Callback userCallback, 
ProducerInterceptors interceptors, TopicPartition tp) {
+private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) {
 this.userCallback = userCallback;
 this.interceptors = interceptors;
-this.tp = tp;
+this.record = record;
 }
 
+@Override
 public void onCompletion(RecordMetadata metadata, Exception exception) 
{
-metadata = metadata != null ? metadata : new RecordMetadata(tp, 
-1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+if (metadata == null) {
+metadata = new RecordMetadata(topicPartition(), -1, -1, 
RecordBatch.NO_TIMESTAMP, -1, -1);
+}
 this.interceptors.onAcknowledgement(metadata, exception);
 if (this.userCallback != null)
 this.userCallback.onCompletion(metadata, exception);
 }
+
+@Override
+public void setPartition(int partition) {
+assert partition != RecordMetadata.UNKNOWN_PARTITION;
+this.partition = partition;
+
+if (log.isTraceEnabled()) {
+// Log the message here, because we don't know the partition 
before that.
+log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+}
+}
+
+public int getPartition() {
+return partition;
+}
+
+public TopicPartition topicPartition() {
+if (record == null)

Review Comment:
   We have a bunch of code in send() that depends on record being not null. 
Perhaps it's better to assert non-null record early in send()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…

2022-05-05 Thread GitBox


guozhangwang commented on PR #12104:
URL: https://github.com/apache/kafka/pull/12104#issuecomment-1118887691

   Thanks @vamossagar12 , the failed tests still contains 
`testDescribeUnderMinIsrPartitionsMixed(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12104/4/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_17_and_Scala_2_13___testDescribeUnderMinIsrPartitionsMixed_String__quorum_zk_2)`
 which seems relevant. Do you want to take a look into them before I re-trigger 
the build?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2022-05-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532427#comment-17532427
 ] 

Matthias J. Sax commented on KAFKA-8769:


Just a "random" comment, based on some other discussion I just had: if we do 
track stream time per key, it could be an issue if a key "goes away" – 
stream-time for this key would stop to advance, and thus we could not close the 
corresponding window (for windowed aggregations and left/outer stream-stream 
join).

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13878) Connect deadlock in WorkerConnector on desired state change

2022-05-05 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-13878:

Component/s: KafkaConnect

> Connect deadlock in WorkerConnector on desired state change
> ---
>
> Key: KAFKA-13878
> URL: https://issues.apache.org/jira/browse/KAFKA-13878
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Major
>
> We've experienced multiple instances of deadlocks where the connector thread 
> is blocked indefinitely with the following stacktrace:
> {noformat}
> "connector-thread-myconnector" #2059323 prio=5 os_prio=0 cpu=123.43ms 
> elapsed=147352.41s tid=0x7f5588160de0 nid=0x18be in Object.wait()  
> [0x7f550e3fe000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(java.base@16.0.2/Native Method)
>     - waiting on 
>     at java.lang.Object.wait(java.base@16.0.2/Object.java:320)
>     at 
> org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:149)
>     - locked <0x0005d2253818> (a 
> org.apache.kafka.connect.runtime.WorkerConnector)
>     at 
> org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:118)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(java.base@16.0.2/Executors.java:515)
>     at 
> java.util.concurrent.FutureTask.run(java.base@16.0.2/FutureTask.java:264)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@16.0.2/ThreadPoolExecutor.java:1130)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@16.0.2/ThreadPoolExecutor.java:630)
>     at java.lang.Thread.run(java.base@16.0.2/Thread.java:831){noformat}
> This appears to be caused by a race condition where a notify() is never 
> delivered to a task, causing the connector thread to wait indefinitely for a 
> notify() call that never comes.
> This causes the connector to be unable to process further state changes, or 
> generate task configurations.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13878) Connect deadlock in WorkerConnector on desired state change

2022-05-05 Thread Greg Harris (Jira)
Greg Harris created KAFKA-13878:
---

 Summary: Connect deadlock in WorkerConnector on desired state 
change
 Key: KAFKA-13878
 URL: https://issues.apache.org/jira/browse/KAFKA-13878
 Project: Kafka
  Issue Type: Bug
Reporter: Greg Harris


We've experienced multiple instances of deadlocks where the connector thread is 
blocked indefinitely with the following stacktrace:
{noformat}
"connector-thread-myconnector" #2059323 prio=5 os_prio=0 cpu=123.43ms 
elapsed=147352.41s tid=0x7f5588160de0 nid=0x18be in Object.wait()  
[0x7f550e3fe000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(java.base@16.0.2/Native Method)
    - waiting on 
    at java.lang.Object.wait(java.base@16.0.2/Object.java:320)
    at 
org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:149)
    - locked <0x0005d2253818> (a 
org.apache.kafka.connect.runtime.WorkerConnector)
    at 
org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:118)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(java.base@16.0.2/Executors.java:515)
    at java.util.concurrent.FutureTask.run(java.base@16.0.2/FutureTask.java:264)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@16.0.2/ThreadPoolExecutor.java:1130)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@16.0.2/ThreadPoolExecutor.java:630)
    at java.lang.Thread.run(java.base@16.0.2/Thread.java:831){noformat}
This appears to be caused by a race condition where a notify() is never 
delivered to a task, causing the connector thread to wait indefinitely for a 
notify() call that never comes.

This causes the connector to be unable to process further state changes, or 
generate task configurations.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-05-05 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13877:
--
Labels: newbie  (was: )

> Flaky 
> RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
> 
>
> Key: KAFKA-13877
> URL: https://issues.apache.org/jira/browse/KAFKA-13877
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>
> The following test fails on local testbeds about once per 10-15 runs:
> {code}
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:87)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at org.junit.Assert.assertTrue(Assert.java:53)
>   at 
> org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-05-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13877:
-

 Summary: Flaky 
RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
 Key: KAFKA-13877
 URL: https://issues.apache.org/jira/browse/KAFKA-13877
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: Guozhang Wang


The following test fails on local testbeds about once per 10-15 runs:

{code}
java.lang.AssertionError
at org.junit.Assert.fail(Assert.java:87)
at org.junit.Assert.assertTrue(Assert.java:42)
at org.junit.Assert.assertTrue(Assert.java:53)
at 
org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)


{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] junrao commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

2022-05-05 Thread GitBox


junrao commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866131407


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java:
##
@@ -25,12 +23,15 @@
 
 private final Cluster cluster;
 private final Serializer keySerializer;
-private final DefaultPartitioner defaultPartitioner;
 
+@SuppressWarnings("deprecation")
+private final 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 
defaultPartitioner;
+
+@SuppressWarnings("deprecation")
 public DefaultStreamPartitioner(final Serializer keySerializer, final 
Cluster cluster) {
 this.cluster = cluster;
 this.keySerializer = keySerializer;
-this.defaultPartitioner = new DefaultPartitioner();
+this.defaultPartitioner = new 
org.apache.kafka.clients.producer.internals.DefaultPartitioner();

Review Comment:
   Hmm, onNewBatch() in DefaultPartitioner is not called by the application 
code. It's called inside Sender. 
   
   Converting DefaultStreamPartitioner to the new built-in partitioner is a bit 
tricky since kstreams calls the partitioner explicitly and then passes in the 
partition during send. We need to change kstreams code so that it lets the 
producer determine the partition. This can be done in a separate PR.  cc 
@guozhangwang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12127: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-05 Thread GitBox


guozhangwang commented on PR #12127:
URL: https://github.com/apache/kafka/pull/12127#issuecomment-1118838068

   cc @lihaosky @mjsax @vvcephei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11917: KAFKA-13785: [8/N][emit final] time-ordered session store

2022-05-05 Thread GitBox


guozhangwang commented on PR #11917:
URL: https://github.com/apache/kafka/pull/11917#issuecomment-1118837108

   The copy PR is here: https://github.com/apache/kafka/pull/12127


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12037: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-05 Thread GitBox


guozhangwang commented on PR #12037:
URL: https://github.com/apache/kafka/pull/12037#issuecomment-1118836834

   https://github.com/apache/kafka/pull/12127


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang opened a new pull request, #12127: KAFKA-13785: [7/N][Emit final] emit final for sliding window

2022-05-05 Thread GitBox


guozhangwang opened a new pull request, #12127:
URL: https://github.com/apache/kafka/pull/12127

   This is a copy PR of #11917. The major diffs are:
   
   1) Avoid extra byte array allocation for fixed upper/lower range 
serialization.
   2) Rename some class names to be more consistent.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #11917: KAFKA-13785: [8/N][emit final] time-ordered session store

2022-05-05 Thread GitBox


guozhangwang commented on code in PR #11917:
URL: https://github.com/apache/kafka/pull/11917#discussion_r865488275


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import 
org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.KeyFirstSessionKeySchema;
+import 
org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFirstSessionKeySchema;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+
+/**
+ * A RocksDB backed time-ordered segmented bytes store for session key schema.
+ */
+public class RocksDBTimeOrderedSessionSegmentedBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore {
+
+private class SessionKeySchemaIndexToBaseStoreIterator  extends 
IndexToBaseStoreIterator {
+SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator indexIterator) {
+super(indexIterator);
+}
+
+@Override
+protected Bytes getBaseKey(final Bytes indexKey) {
+final Window window = 
KeyFirstSessionKeySchema.extractWindow(indexKey.get());
+final byte[] key = 
KeyFirstSessionKeySchema.extractKeyBytes(indexKey.get());
+
+return TimeFirstSessionKeySchema.toBinary(Bytes.wrap(key), 
window.start(), window.end());
+}
+}
+
+RocksDBTimeOrderedSessionSegmentedBytesStore(final String name,
+ final String metricsScope,
+ final long retention,
+ final long segmentInterval,
+ final boolean withIndex) {
+super(name, metricsScope, retention, segmentInterval, new 
TimeFirstSessionKeySchema(),

Review Comment:
   We can discuss in a later time in another PR: for session stores, as we now 
use time-first as the base store, we would not probably need to use the 
variable upper/lower range for it any more.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java:
##
@@ -32,89 +30,19 @@
 import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * RocksDB store backed by two SegmentedBytesStores which can optimize scan by 
time as well as window
- * lookup for a specific key.
- *
- * Schema for first SegmentedBytesStore (base store) is as below:
- * Key schema: | timestamp + recordkey |
- * Value schema: | value |. Value here is determined by caller.
- *
- * Schema for second SegmentedBytesStore (index store) is as below:
- * Key schema: | record + timestamp |
- * Value schema: ||
- *
- * Operations:
- * Put: 1. Put to index store. 2. Put to base store.
- * Delete: 1. Delete from base store. 2. Delete from index store.
- * Since we need to update two stores, failure can happen in the middle. We 
put in index store first
- * to make sure if a failure happens in second step and the view is 
inconsistent, we can't get the
- * value for the key. We delete from base store first to make sure if a 
failure happens in second step
- * and the view is inconsistent, we can't get the value for the key.
- *
- * Note:
- * Index store can be optional if we can construct the timestamp in base 
store instead of looking

[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-05 Thread GitBox


rittikaadhikari commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r866120522


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -51,6 +51,7 @@ import scala.math._
  */
 abstract class AbstractFetcherThread(name: String,
  clientId: String,
+ val leader: LeaderEndPoint,

Review Comment:
   iiuc, the BrokerEndPoint isn't passed into the LeaderEndPoint (not Remote or 
Local)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-05 Thread GitBox


junrao commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r865419210


##
core/src/main/scala/kafka/server/LeaderEndPoint.scala:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.FetchRequest
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.message.{FetchResponseData, 
OffsetForLeaderEpochRequestData}
+
+import scala.collection.Map
+
+/**
+ * The LeaderEndPoint acts as an abstraction which serves all fetches from the 
leader
+ * for the fetcher threads.
+ */
+trait LeaderEndPoint {
+
+  type FetchData = FetchResponseData.PartitionData
+  type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
+
+  /**
+   * A boolean specifying if truncation when fetching from the leader is 
supported
+   */
+  def isTruncationOnFetchSupported: Boolean
+
+  /**
+   * Initiate closing access to fetches from leader.
+   */
+  def initiateClose(): Unit
+
+  /**
+   * Closes access to fetches from leader.
+   */
+  def close(): Unit
+
+  /**
+   * Given a fetchRequest, carries out the expected request and returns
+   * the results from fetching from the leader.
+   *
+   * @param fetchRequest The fetch request we want to carry out
+   *
+   * @return A map of topic partition -> fetch data
+   */
+  def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData]
+
+  /**
+   * Fetches the log start offset of the given topic partition, at a specific
+   * leader epoch, from the leader.
+   *
+   * @param topicPartition The topic partition that we want to fetch from
+   * @param currentLeaderEpoch An int representing the current leader epoch 
that we want to fetch from
+   *
+   * @return A long representing the earliest offset in the leader's topic 
partition.
+   */
+  def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: 
Int): Long
+
+  /**
+   * Fetches the log end offset of the given topic partition, at a specific
+   * leader epoch, from the leader.
+   *
+   * @param topicPartition The topic partition that we want to fetch from
+   * @param currentLeaderEpoch An int representing the current leader epoch 
that we want to fetch from

Review Comment:
   An int representing the current leader epoch that we want to fetch from => 
current leader epoch of the requestor



##
core/src/main/scala/kafka/server/LeaderEndPoint.scala:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.FetchRequest
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.message.{FetchResponseData, 
OffsetForLeaderEpochRequestData}
+
+import scala.collection.Map
+
+/**
+ * The LeaderEndPoint acts as an abstraction which serves all fetches from the 
leader
+ * for the fetcher threads.
+ */
+trait LeaderEndPoint {
+
+  type FetchData = FetchResponseData.PartitionData
+  type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
+
+  /**
+   * A boolean 

[GitHub] [kafka] mumrah commented on pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller

2022-05-05 Thread GitBox


mumrah commented on PR #12050:
URL: https://github.com/apache/kafka/pull/12050#issuecomment-1118712139

   @dengziming @cmccabe I've updated this PR with the changes from #12072. I've 
also modified the bootstrap mechanism to use a file with the KRaft snapshot 
format rather than a property in meta.properties. This is a slight departure 
from the KIP, but using a snapshot will open up the ability for us to bootstrap 
other records on first startup. If we agree that this is a good approach, I can 
update the KIP and discussion thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13876) Mirrormaker-2 consumer settings not working

2022-05-05 Thread Christian Bates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christian Bates updated KAFKA-13876:

Description: 
I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh 
and a properties file.

I followed the guide at 
[https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to 
configure the consumer properties, however, no matter what I set, the timeout 
for the consumer remains fixed at the default, confirmed both by kafka 
outputting its config in the log and by timing how long between disconnect 
messages I get.  e.g. I set:

{{CLOUD_EU.consumer.request.timeout.ms=12}}

In the properties that I start MM-2 with and it is ignored.  

 

based on various guides I have found while looking at this, I have also tried

{{CLOUD_EU.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.override.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}}

None of which have worked.  

How can I change the consumer request.timeout setting?

 

Context:

I am attempting to use mirrormaker 2 to replicate data between AWS Managed 
Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the 
other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1.  For testing I am 
currently trying just to replicate topics 1 way, from EU -> NA.

This works fine for topics with small messages on them, but one of my topic has 
binary messages up to 20MB in size.  When I try to replicate that topic I get 
an error every 30 seconds 

{{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] 
Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. 
(org.apache.kafka.clients.FetchSessionHandler:481)}}
{{org.apache.kafka.common.errors.DisconnectException}}

 

When logging in DEBUG to get more information we get

{{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] 
Disconnecting from node 2 due to request timeout. 
(org.apache.kafka.clients.NetworkClient:784)}}
{{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] 
Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, 
clientId=consumer-29, correlationId=35) due to node 2 being disconnected 
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}}

It gets stuck in a loop constantly disconnecting with request timeout every 30s 
and then trying again.

Looking at this, I suspected that the problem is the `request.timeout.ms` was 
on the default (30s) and timing out trying to read the topic with many large 
messages, hence I was trying to override the consumer settings to have a longer 
timeout.

 

The 2 specified versions are the ones I have explicitly tested against.  

 

Properties file I am using to start the cluster:

 

{{clusters = CLOUD_EU, CLOUD_NA}}

{{CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092}}
{{CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092}}

{{CLOUD_EU->CLOUD_NA.enabled = true}}
{{CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU}}
{{CLOUD_NA->CLOUD_EU.enabled = false}}

{{replication.factor=3}}
{{tasks.max = 1}}

{{checkpoints.topic.replication.factor=3}}
{{heartbeats.topic.replication.factor=3}}
{{offset-syncs.topic.replication.factor=3}}
{{offset.storage.replication.factor=3}}
{{status.storage.replication.factor=3}}
{{config.storage.replication.factor=3}}

{{CLOUD_EU.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.session.timeout.ms=15}}

  was:
I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh 
and a properties file.

I followed the guide at 
[https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to 
configure the consumer properties, however, no matter what I set, the timeout 
for the consumer remains fixed at the default, confirmed both by kafka 
outputting its config in the log and by timing how long between disconnect 
messages I get.  e.g. I set:

{{CLOUD_EU.consumer.request.timeout.ms=12}}

In the properties that I start MM-2 with and it is ignored.  

 

based on various guides I have found while looking at this, I have also tried

{{CLOUD_EU.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.override.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}}

None of which have worked.  

How can I change the consumer request.timeout setting?

 

Context:

I am attempting to use mirrormaker 2 to replicate data between AWS Managed 
Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the 
other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1.  For testing I am 
currently trying just to replicate topics 1 way, from EU -> NA.

This works fine for topics with small 

[jira] [Updated] (KAFKA-13876) Mirrormaker-2 consumer settings not working

2022-05-05 Thread Christian Bates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christian Bates updated KAFKA-13876:

Description: 
I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh 
and a properties file.

I followed the guide at 
[https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to 
configure the consumer properties, however, no matter what I set, the timeout 
for the consumer remains fixed at the default, confirmed both by kafka 
outputting its config in the log and by timing how long between disconnect 
messages I get.  e.g. I set:

{{CLOUD_EU.consumer.request.timeout.ms=12}}

In the properties that I start MM-2 with and it is ignored.  

 

based on various guides I have found while looking at this, I have also tried

{{CLOUD_EU.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.override.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}}

None of which have worked.  

How can I change the consumer request.timeout setting?

 

Context:

I am attempting to use mirrormaker 2 to replicate data between AWS Managed 
Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the 
other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1.  For testing I am 
currently trying just to replicate topics 1 way, from EU -> NA.

This works fine for topics with small messages on them, but one of my topic has 
binary messages up to 20MB in size.  When I try to replicate that topic I get 
an error every 30 seconds 

{{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] 
Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. 
(org.apache.kafka.clients.FetchSessionHandler:481)}}
{{org.apache.kafka.common.errors.DisconnectException}}

 

When logging in DEBUG to get more information we get

{{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] 
Disconnecting from node 2 due to request timeout. 
(org.apache.kafka.clients.NetworkClient:784)}}
{{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] 
Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, 
clientId=consumer-29, correlationId=35) due to node 2 being disconnected 
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}}

It gets stuck in a loop constantly disconnecting with request timeout every 30s 
and then trying again.

Looking at this, I suspected that the problem is the `request.timeout.ms` was 
on the default (30s) and timing out trying to read the topic with many large 
messages, hence I was trying to override the consumer settings to have a longer 
timeout.

 

The 2 specified versions are the ones I have explicitly tested against.  

 

Properties file I am using to start the cluster:

 

{{clusters = CLOUD_EU, CLOUD_NA}}

{{CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092}}
{{CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092}}

{{CLOUD_EU->CLOUD_NA.enabled = true}}
{{CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU}}
{{CLOUD_NA->CLOUD_EU.enabled = false}}

{{replication.factor=3}}
{{{}tasks.max = 1{}}}{{{}checkpoints.topic.replication.factor=3{}}}
{{heartbeats.topic.replication.factor=3}}
{{offset-syncs.topic.replication.factor=3}}
{{offset.storage.replication.factor=3}}
{{status.storage.replication.factor=3}}
{{config.storage.replication.factor=3}}

{{CLOUD_EU.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.session.timeout.ms=15}}

  was:
I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh 
and a properties file.

I followed the guide at 
[https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to 
configure the consumer properties, however, no matter what I set, the timeout 
for the consumer remains fixed at the default, confirmed both by kafka 
outputting its config in the log and by timing how long between disconnect 
messages I get.  e.g. I set:

{{CLOUD_EU.consumer.request.timeout.ms=12}}

In the properties that I start MM-2 with and it is ignored.  

 

based on various guides I have found while looking at this, I have also tried

{{CLOUD_EU.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.override.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}}

None of which have worked.  

How can I change the consumer request.timeout setting?

 

Context:

I am attempting to use mirrormaker 2 to replicate data between AWS Managed 
Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the 
other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1.  For testing I am 
currently trying just to replicate topics 1 way, from EU -> NA.

This works fine for topics with small 

[jira] [Updated] (KAFKA-13876) Mirrormaker-2 consumer settings not working

2022-05-05 Thread Christian Bates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christian Bates updated KAFKA-13876:

Description: 
I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh 
and a properties file.

I followed the guide at 
[https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to 
configure the consumer properties, however, no matter what I set, the timeout 
for the consumer remains fixed at the default, confirmed both by kafka 
outputting its config in the log and by timing how long between disconnect 
messages I get.  e.g. I set:

{{CLOUD_EU.consumer.request.timeout.ms=12}}

In the properties that I start MM-2 with and it is ignored.  

 

based on various guides I have found while looking at this, I have also tried

{{CLOUD_EU.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.override.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}}

None of which have worked.  

How can I change the consumer request.timeout setting?

 

Context:

I am attempting to use mirrormaker 2 to replicate data between AWS Managed 
Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the 
other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1.  For testing I am 
currently trying just to replicate topics 1 way, from EU -> NA.

This works fine for topics with small messages on them, but one of my topic has 
binary messages up to 20MB in size.  When I try to replicate that topic I get 
an error every 30 seconds 

{{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] 
Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. 
(org.apache.kafka.clients.FetchSessionHandler:481)}}
{{org.apache.kafka.common.errors.DisconnectException}}

 

When logging in DEBUG to get more information we get

{{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] 
Disconnecting from node 2 due to request timeout. 
(org.apache.kafka.clients.NetworkClient:784)}}
{{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] 
Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, 
clientId=consumer-29, correlationId=35) due to node 2 being disconnected 
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}}

It gets stuck in a loop constantly disconnecting with request timeout every 30s 
and then trying again.

Looking at this, I suspected that the problem is the `request.timeout.ms` was 
on the default (30s) and timing out trying to read the topic with many large 
messages, hence I was trying to override the consumer settings to have a longer 
timeout.

 

Properties file I am using to start the cluster:

 

clusters = CLOUD_EU, CLOUD_NA

CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092
CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092


CLOUD_EU->CLOUD_NA.enabled = true
CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU
CLOUD_NA->CLOUD_EU.enabled = false

replication.factor=3
tasks.max = 1


checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3

CLOUD_EU.consumer.request.timeout.ms=12
CLOUD_EU.consumer.session.timeout.ms=15

  was:
I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh 
and a properties file.

I followed the guide at 
[https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to 
configure the consumer properties, however, no matter what I set, the timeout 
for the consumer remains fixed at the default, confirmed both by kafka 
outputting its config in the log and by timing how long between disconnect 
messages I get.  e.g. I set:

{{CLOUD_EU.consumer.request.timeout.ms=12}}

In the properties that I start MM-2 with and it is ignored.  

 

based on various guides I have found while looking at this, I have also tried

{{CLOUD_EU.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.override.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}}

None of which have worked.  

How can I change the consumer request.timeout setting?

 

Context:

I am attempting to use mirrormaker 2 to replicate data between AWS Managed 
Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the 
other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1.  For testing I am 
currently trying just to replicate topics 1 way, from EU -> NA.

This works fine for topics with small messages on them, but one of my topic has 
binary messages up to 20MB in size.  When I try to replicate that topic I get 
an error every 30 

[jira] [Updated] (KAFKA-13876) Mirrormaker-2 consumer settings not working

2022-05-05 Thread Christian Bates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christian Bates updated KAFKA-13876:

Description: 
I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh 
and a properties file.

I followed the guide at 
[https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to 
configure the consumer properties, however, no matter what I set, the timeout 
for the consumer remains fixed at the default, confirmed both by kafka 
outputting its config in the log and by timing how long between disconnect 
messages I get.  e.g. I set:

{{CLOUD_EU.consumer.request.timeout.ms=12}}

In the properties that I start MM-2 with and it is ignored.  

 

based on various guides I have found while looking at this, I have also tried

{{CLOUD_EU.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.override.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}}

None of which have worked.  

How can I change the consumer request.timeout setting?

 

Context:

I am attempting to use mirrormaker 2 to replicate data between AWS Managed 
Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the 
other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1.  For testing I am 
currently trying just to replicate topics 1 way, from EU -> NA.

This works fine for topics with small messages on them, but one of my topic has 
binary messages up to 20MB in size.  When I try to replicate that topic I get 
an error every 30 seconds 

{{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] 
Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. 
(org.apache.kafka.clients.FetchSessionHandler:481)}}
{{org.apache.kafka.common.errors.DisconnectException}}

 

When logging in DEBUG to get more information we get

{{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] 
Disconnecting from node 2 due to request timeout. 
(org.apache.kafka.clients.NetworkClient:784)}}
{{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] 
Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, 
clientId=consumer-29, correlationId=35) due to node 2 being disconnected 
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}}

It gets stuck in a loop constantly disconnecting with request timeout every 30s 
and then trying again.

Looking at this, I suspected that the problem is the `request.timeout.ms` was 
on the default (30s) and timing out trying to read the topic with many large 
messages, hence I was trying to override the consumer settings to have a longer 
timeout.

 

Properties file I am using to start the cluster:

# specify any number of cluster aliases
clusters = CLOUD_EU, CLOUD_NA

# connection information for each cluster
CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092
CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092

# enable and configure individual replication flows
CLOUD_EU->CLOUD_NA.enabled = true
CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU
CLOUD_NA->CLOUD_EU.enabled = false

replication.factor=3
tasks.max = 1

# Internal Topic Settings  
#
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3

    Kafka Settings    
###

# CLOUD_EU cluster over writes
CLOUD_EU.consumer.request.timeout.ms=12
CLOUD_EU.consumer.session.timeout.ms=15

  was:
I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh 
and a properties file.

I followed the guide at 
https://github.com/apache/kafka/tree/trunk/connect/mirror to attempt to 
configure the consumer properties, however, no matter what I set, the timeout 
for the consumer remains fixed at the default, confirmed both by kafka 
outputting its config in the log and by timing how long between disconnect 
messages I get.  e.g. I set:

{{CLOUD_EU.consumer.request.timeout.ms=12}}

In the properties that I start MM-2 with and it is ignored.  

 

based on various guides I have found while looking at this, I have also tried

{{CLOUD_EU.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.override.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}}

None of which have worked.  

How can I change the consumer request.timeout setting?

 

Context:

I am attempting to use mirrormaker 2 to replicate data between AWS Managed 
Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) 

[jira] [Created] (KAFKA-13876) Mirrormaker-2 consumer settings not working

2022-05-05 Thread Christian Bates (Jira)
Christian Bates created KAFKA-13876:
---

 Summary: Mirrormaker-2 consumer settings not working
 Key: KAFKA-13876
 URL: https://issues.apache.org/jira/browse/KAFKA-13876
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.6.1, 2.5.0
Reporter: Christian Bates


I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh 
and a properties file.

I followed the guide at 
https://github.com/apache/kafka/tree/trunk/connect/mirror to attempt to 
configure the consumer properties, however, no matter what I set, the timeout 
for the consumer remains fixed at the default, confirmed both by kafka 
outputting its config in the log and by timing how long between disconnect 
messages I get.  e.g. I set:

{{CLOUD_EU.consumer.request.timeout.ms=12}}

In the properties that I start MM-2 with and it is ignored.  

 

based on various guides I have found while looking at this, I have also tried

{{CLOUD_EU.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.request.timeout.ms=12}}
{{CLOUD_EU.consumer.override.request.timeout.ms=12}}
{{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}}

None of which have worked.  

How can I change the consumer request.timeout setting?

 

Context:

I am attempting to use mirrormaker 2 to replicate data between AWS Managed 
Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the 
other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1.  For testing I am 
currently trying just to replicate topics 1 way, from EU -> NA.

This works fine for topics with small messages on them, but one of my topic has 
binary messages up to 20MB in size.  When I try to replicate that topic I get 
an error every 30 seconds 

{{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] 
Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. 
(org.apache.kafka.clients.FetchSessionHandler:481)}}
{{org.apache.kafka.common.errors.DisconnectException}}

 

When logging in DEBUG to get more information we get

{{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] 
Disconnecting from node 2 due to request timeout. 
(org.apache.kafka.clients.NetworkClient:784)}}
{{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] 
Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, 
clientId=consumer-29, correlationId=35) due to node 2 being disconnected 
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}}

It gets stuck in a loop constantly disconnecting with request timeout every 30s 
and then trying again.

Looking at this, I suspected that the problem is the `request.timeout.ms` was 
on the default (30s) and timing out trying to read the topic with many large 
messages, hence I was trying to override the consumer settings to have a longer 
timeout.

 

Properties file I am using to start the cluster:

 

# specify any number of cluster aliases
clusters = CLOUD_EU, CLOUD_NA

# connection information for each cluster
CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092
CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092

# enable and configure individual replication flows
CLOUD_EU->CLOUD_NA.enabled = true
CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU
CLOUD_NA->CLOUD_EU.enabled = false

replication.factor=3
tasks.max = 1

# Internal Topic Settings  
#
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3

    Kafka Settings    
###

# CLOUD_EU cluster over writes
CLOUD_EU.consumer.request.timeout.ms=12
CLOUD_EU.consumer.session.timeout.ms=15



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dengziming commented on a diff in pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode

2022-05-05 Thread GitBox


dengziming commented on code in PR #12108:
URL: https://github.com/apache/kafka/pull/12108#discussion_r865580075


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -1751,13 +1777,32 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, 
LogConfig.Compact + "," + LogConfig.Delete), AlterConfigOp.OpType.SUBTRACT)
 ).asJavaCollection
 
-alterResult = client.incrementalAlterConfigs(Map(
+alterConfigs = Map(
   topic1Resource -> topic1AlterConfigs,
   topic2Resource -> topic2AlterConfigs
-).asJava)
+)
+alterResult = client.incrementalAlterConfigs(alterConfigs.asJava)
 assertEquals(Set(topic1Resource, topic2Resource).asJava, 
alterResult.values.keySet)
 alterResult.all.get
 
+if (isKRaftTest()) {

Review Comment:
   Yeah, nice suggestion, waiting for the metadata offset is simpler and more 
generic. We can make use of this util function in other places since there are 
already many other places using this similar specific assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13871) The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the RaftConfig class is incorrect

2022-05-05 Thread yingquan he (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yingquan he reassigned KAFKA-13871:
---

Assignee: (was: yingquan he)

> The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the 
> RaftConfig class is incorrect
> --
>
> Key: KAFKA-13871
> URL: https://issues.apache.org/jira/browse/KAFKA-13871
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: yingquan he
>Priority: Minor
>
> The syntax of the field QUORUM_FETCH_TIMEOUT_MS_DOC is incorrect. `a 
> election`  should be changed to `an election`.
>  
> {code:java}
> public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time 
> without a successful fetch from " +
> "the current leader before becoming a candidate and triggering a election 
> for voters; Maximum time without " +
> "receiving fetch from a majority of the quorum before asking around to 
> see if there's a new epoch for leader"; {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] pan3793 opened a new pull request, #12126: KAFKA-8713 KIP-581: Add new conf serialize.accept.optional.null in connect-json

2022-05-05 Thread GitBox


pan3793 opened a new pull request, #12126:
URL: https://github.com/apache/kafka/pull/12126

   See details on KAFKA-8713 and KIP-581
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-05 Thread lqjacklee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lqjacklee updated KAFKA-13864:
--
Attachment: (was: interceptor_constructor_client.patch)

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-05 Thread lqjacklee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532216#comment-17532216
 ] 

lqjacklee commented on KAFKA-13864:
---

[~frosiere]  [~cadonna]  please help review the code , thanks .

 

 

https://github.com/apache/kafka/pull/12125/files

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] lqjack opened a new pull request, #12125: provide the construct interceptor for KafkaProducer and KafkaConsumer

2022-05-05 Thread GitBox


lqjack opened a new pull request, #12125:
URL: https://github.com/apache/kafka/pull/12125

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] konL opened a new pull request, #12124: rename isDone to hasComplete

2022-05-05 Thread GitBox


konL opened a new pull request, #12124:
URL: https://github.com/apache/kafka/pull/12124

   I used deep learning to discover that the method 'isDone' changed in 
4af50bb8600c37ee2e3597fba9a54a29cef94afa.
   The code snippet before and after the change is as follows.
   ```
   public boolean isDone() {
   return isDone;
   }
   ```
   
   ```
   public boolean isDone() {
   return result.get() != INCOMPLETE_SENTINEL;
   }
   ```
   We thought we could rename `isDone` to `hasComplete` with a more specific 
meaning.
   
   We are actually looking at analysing the reasons for renaming  identifiers 
and we would like to get advice from more experienced people, so would 
appreciate your feedback from one of the following perspectives.
1. `(merge)`you accept this renaming opportunity and agree with the 
recommended name
2. `(agree and not recommend)`you accept the renaming but disagree with the 
recommended name, do you think the recommended name means the same thing or is 
the recommended name simply not suitable?
   3. `(not agree and useful)`Would our suggested renaming opportunity be 
useful for your other refactoring activities?
   4. `(not agree and not fix) `You do not think our suggested renaming 
opportunity is appropriate. Is this because you think the identifier meaning 
does not need to be renamed or is it some other issue?
   We look forward to your feedback!
   
   Thank you!
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13872) Partitions are truncated when leader is replaced

2022-05-05 Thread Jack Vanlightly (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532122#comment-17532122
 ] 

Jack Vanlightly commented on KAFKA-13872:
-

I presume you would need to perform a broker decommissioning process to remove 
that broker from the cluster before adding a new empty broker with the same id?

Is there documentation for how to decommission a dead broker safely?

> Partitions are truncated when leader is replaced
> 
>
> Key: KAFKA-13872
> URL: https://issues.apache.org/jira/browse/KAFKA-13872
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.2
>Reporter: Francois Visconte
>Priority: Major
> Attachments: extract-2022-05-04T15_50_34.110Z.csv
>
>
> Sample setup:
>  * a topic with one partition and RF=3
>  * a producer using acks=1
>  * min.insync.replicas to 1
>  * 3 brokers 1,2,3
>  * Preferred leader of the partition is brokerId 0
>  
> Steps to reproduce the issue
>  * Producer keeps producing to the partition, leader is brokerId=0
>  * At some point, replicas 1 and 2 are falling behind and removed from the ISR
>  * The leader broker 0 has an hardware failure
>  * Partition transition to offline
>  * This leader is replaced with a new broker with an empty disk and the same 
> broker id 0
>  * Partition transition from offline to online with leader 0, ISR = 0
>  * Followers see the leader offset is 0 and decide to truncate their 
> partitions to 0, ISR=0,1,2
>  * At this point all the topic data has been removed from all replicas and 
> partition size drops to 0 on all replicas
> Attached some of the relevant logs. I can provide more logs if necessary



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dengziming commented on pull request #12109: KAFKA-13863: Prevent null config value when create topic in KRaft mode

2022-05-05 Thread GitBox


dengziming commented on PR #12109:
URL: https://github.com/apache/kafka/pull/12109#issuecomment-1118264999

   @cmccabe, Yeah, it's unnecessary to change `ReplicationControlManager`, 
however, there is still a difference, in KRaft mode, the null config value is 
treated as removing configs, so creating topics with null config value will 
just act the same with creating topics without that config value.
   
   Do you think this is a reasonable change? or this is an incompatible bug?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13872) Partitions are truncated when leader is replaced

2022-05-05 Thread Francois Visconte (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532092#comment-17532092
 ] 

Francois Visconte commented on KAFKA-13872:
---

I was expecting the partition to stay offline until I decide to set the 
unclean.leader.election to true and one of the lagging replica to take ownership

> Partitions are truncated when leader is replaced
> 
>
> Key: KAFKA-13872
> URL: https://issues.apache.org/jira/browse/KAFKA-13872
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.2
>Reporter: Francois Visconte
>Priority: Major
> Attachments: extract-2022-05-04T15_50_34.110Z.csv
>
>
> Sample setup:
>  * a topic with one partition and RF=3
>  * a producer using acks=1
>  * min.insync.replicas to 1
>  * 3 brokers 1,2,3
>  * Preferred leader of the partition is brokerId 0
>  
> Steps to reproduce the issue
>  * Producer keeps producing to the partition, leader is brokerId=0
>  * At some point, replicas 1 and 2 are falling behind and removed from the ISR
>  * The leader broker 0 has an hardware failure
>  * Partition transition to offline
>  * This leader is replaced with a new broker with an empty disk and the same 
> broker id 0
>  * Partition transition from offline to online with leader 0, ISR = 0
>  * Followers see the leader offset is 0 and decide to truncate their 
> partitions to 0, ISR=0,1,2
>  * At this point all the topic data has been removed from all replicas and 
> partition size drops to 0 on all replicas
> Attached some of the relevant logs. I can provide more logs if necessary



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dengziming commented on pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode

2022-05-05 Thread GitBox


dengziming commented on PR #12108:
URL: https://github.com/apache/kafka/pull/12108#issuecomment-1118210953

   Thank you for your suggestions @hachikuji @akhileshchg, PTAL again when you 
have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode

2022-05-05 Thread GitBox


dengziming commented on code in PR #12108:
URL: https://github.com/apache/kafka/pull/12108#discussion_r865581359


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -215,13 +215,17 @@ private void 
incrementalAlterConfigResource(ConfigResource configResource,
 }
 List newValueParts = getParts(newValue, key, 
configResource);
 if (opType == APPEND) {
-if (!newValueParts.contains(opValue)) {
-newValueParts.add(opValue);
+for (String value: opValue.split(",")) {
+if (!newValueParts.contains(value)) {

Review Comment:
   Nice catch, found a small bug of `ConfigAdminManager` when working on this 
integration test.
   
   for the variable naming, I think both are OK here, I used `oldValueList` to 
be a better one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode

2022-05-05 Thread GitBox


dengziming commented on code in PR #12108:
URL: https://github.com/apache/kafka/pull/12108#discussion_r865580075


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -1751,13 +1777,32 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, 
LogConfig.Compact + "," + LogConfig.Delete), AlterConfigOp.OpType.SUBTRACT)
 ).asJavaCollection
 
-alterResult = client.incrementalAlterConfigs(Map(
+alterConfigs = Map(
   topic1Resource -> topic1AlterConfigs,
   topic2Resource -> topic2AlterConfigs
-).asJava)
+)
+alterResult = client.incrementalAlterConfigs(alterConfigs.asJava)
 assertEquals(Set(topic1Resource, topic2Resource).asJava, 
alterResult.values.keySet)
 alterResult.all.get
 
+if (isKRaftTest()) {

Review Comment:
   Yeah, nice suggestion, waiting for the metadata offset is simpler and more 
generic. I will submit a separate PR to make this a util function since there 
are already many other places using this similar specific assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org