[GitHub] [kafka] dengziming commented on pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions
dengziming commented on PR #12112: URL: https://github.com/apache/kafka/pull/12112#issuecomment-1119266873 @showuon I have updated it, though @divijvaidya provided a good suggestion, we still have a more generic solution for this kind of flakiness. [see details here](https://github.com/apache/kafka/pull/12108#discussion_r865373299) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions
dengziming commented on code in PR #12112: URL: https://github.com/apache/kafka/pull/12112#discussion_r866494535 ## core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala: ## @@ -586,11 +586,14 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) val aliveServers = brokers.filterNot(_.config.brokerId == 0) - TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0) + TestUtils.waitUntilTrue( Review Comment: @divijvaidya I really appreciate your suggestion, however, I need to clarify something here: 1. We should use `TestUtils.waitForBrokersOutOfIsr` not `TestUtils.waitForBrokersInIsr` 2. `TestUtils.waitForBrokersInIsr` needs to send RPC but we can directly use MetadataCache which is in the memory, so it's faster and more reliable, maybe we can improve `TestUtils.waitForBrokersInIsr` in the future 3. This is most important, in another PR we found we can use a more generic way in KRaft mode, which waits until the brokers have caught up to the controller metadata topic end offset, instead of waiting for something really specific. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…
vamossagar12 commented on code in PR #12104: URL: https://github.com/apache/kafka/pull/12104#discussion_r866489041 ## core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala: ## @@ -733,12 +733,18 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi killBroker(0) val aliveServers = brokers.filterNot(_.config.brokerId == 0) TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0) - val output = TestUtils.grabConsoleOutput( -topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions" + var output = "" + TestUtils.waitUntilTrue( +() => { + output = TestUtils.grabConsoleOutput( +topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions" Review Comment: @dengziming , looks like the latest build failed with a timeout => ``` org.opentest4j.AssertionFailedError: Timeout waiting for partition metadata propagating to brokers for underMinIsrTopic topic ``` Sure I will take a look at that.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…
vamossagar12 commented on PR #12104: URL: https://github.com/apache/kafka/pull/12104#issuecomment-1119258638 > Details @guozhangwang , Thanks for pointing it out. I will take a look at this and make another commit which should re-trigger the build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ahuang98 closed pull request #12123: KAFKA-13830 Introduce metadata.version for KRaft
ahuang98 closed pull request #12123: KAFKA-13830 Introduce metadata.version for KRaft URL: https://github.com/apache/kafka/pull/12123 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions
showuon commented on PR #12112: URL: https://github.com/apache/kafka/pull/12112#issuecomment-1119226457 @dengziming , it's fine, we can improve the tests anyway. I agree your fix is better. Let's address @divijvaidya 's comment. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13854) Refactor ApiVersion to MetadataVersion
[ https://issues.apache.org/jira/browse/KAFKA-13854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming resolved KAFKA-13854. Resolution: Fixed > Refactor ApiVersion to MetadataVersion > -- > > Key: KAFKA-13854 > URL: https://issues.apache.org/jira/browse/KAFKA-13854 > Project: Kafka > Issue Type: Sub-task >Reporter: David Arthur >Assignee: Alyssa Huang >Priority: Major > > In KRaft, we will have a value for {{metadata.version}} corresponding to each > IBP. In order to keep this association and make it obvious for developers, we > will consolidate the IBP and metadata version into a new MetadataVersion > enum. This new enum will replace the existing ApiVersion trait. > For IBPs that precede the first KRaft preview version (AK 3.0), we will use a > value of -1 for the metadata.version. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13880) DefaultStreamPartitioner may get "stuck" to one partition for unkeyed messages
Artem Livshits created KAFKA-13880: -- Summary: DefaultStreamPartitioner may get "stuck" to one partition for unkeyed messages Key: KAFKA-13880 URL: https://issues.apache.org/jira/browse/KAFKA-13880 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.4.0 Reporter: Artem Livshits While working on KIP-794, I noticed that DefaultStreamPartitioner does not call .onNewBatch. The "sticky" DefaultStreamPartitioner introduced as a result of https://issues.apache.org/jira/browse/KAFKA-8601 requires .onNewBatch call in order to switch to a new partitions for unkeyed messages, just calling .partition would return the same "sticky" partition chosen during the first call to .partition. The partition doesn't change even if the partition leader is unavailable. Ideally, for unkeyed messages the DefaultStreamPartitioner should take advantage of the new built-in partitioning logic introduced in [https://github.com/apache/kafka/pull/12049.] Perhaps, it could return null partition for unkeyed message, so that KafkaProducer could run built-in partitioning logic. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] artemlivshits commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg
artemlivshits commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r866402725 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1403,25 +1452,54 @@ public boolean isDone() { } /** - * A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and - * notifies producer interceptors about the request completion. + * Callbacks that are called by the RecordAccumulator append functions: + * - user callback + * - interceptor callbacks + * - partition callback */ -private static class InterceptorCallback implements Callback { +private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; -private final TopicPartition tp; +private final ProducerRecord record; +protected int partition = RecordMetadata.UNKNOWN_PARTITION; -private InterceptorCallback(Callback userCallback, ProducerInterceptors interceptors, TopicPartition tp) { +private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; -this.tp = tp; +this.record = record; } +@Override public void onCompletion(RecordMetadata metadata, Exception exception) { -metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); +if (metadata == null) { +metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); +} this.interceptors.onAcknowledgement(metadata, exception); if (this.userCallback != null) this.userCallback.onCompletion(metadata, exception); } + +@Override +public void setPartition(int partition) { +assert partition != RecordMetadata.UNKNOWN_PARTITION; +this.partition = partition; + +if (log.isTraceEnabled()) { +// Log the message here, because we don't know the partition before that. +log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); +} +} + +public int getPartition() { +return partition; +} + +public TopicPartition topicPartition() { +if (record == null) Review Comment: This is an existing test https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L1041 It passes in null and expects that the error goes through the interceptor's callback, but topicPartition() is called to supply an argument to the callback (which can handle null topicPartition) and it was throwing NPE. I personally would just declare record as non-null and treat it as a bug in the application (undefined behavior), but as defined currently, it's a tested run-time error condition with some expected behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YiDing-Duke commented on a diff in pull request #9874: KAFKA-7340: Migrate clients module to JUnit 5
YiDing-Duke commented on code in PR #9874: URL: https://github.com/apache/kafka/pull/9874#discussion_r866399744 ## clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java: ## @@ -228,12 +213,8 @@ private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws } private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception { -if (failedAuthenticationDelayMs != -1) Review Comment: It's not dead code, it sets the delay in Selector level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
rittikaadhikari commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r866395929 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -51,6 +51,7 @@ import scala.math._ */ abstract class AbstractFetcherThread(name: String, clientId: String, + val leader: LeaderEndPoint, Review Comment: We could end up having something like the following: ``` trait LeaderEndPoint { def brokerEndPoint(): BrokerEndPoint } class LocalLeaderEndPoint(sourceBroker: brokerEndPoint, ) extends LeaderEndPoint { def brokerEndPoint(): return sourceBroker class RemoteLeaderEndPoint(endpoint: BlockingSend, ...) extends LeaderEndPoint { def brokerEndPoint(): return endpoint.sourceBroker } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg
junrao commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r866393904 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1403,25 +1452,54 @@ public boolean isDone() { } /** - * A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and - * notifies producer interceptors about the request completion. + * Callbacks that are called by the RecordAccumulator append functions: + * - user callback + * - interceptor callbacks + * - partition callback */ -private static class InterceptorCallback implements Callback { +private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; -private final TopicPartition tp; +private final ProducerRecord record; +protected int partition = RecordMetadata.UNKNOWN_PARTITION; -private InterceptorCallback(Callback userCallback, ProducerInterceptors interceptors, TopicPartition tp) { +private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; -this.tp = tp; +this.record = record; } +@Override public void onCompletion(RecordMetadata metadata, Exception exception) { -metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); +if (metadata == null) { +metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); +} this.interceptors.onAcknowledgement(metadata, exception); if (this.userCallback != null) this.userCallback.onCompletion(metadata, exception); } + +@Override +public void setPartition(int partition) { +assert partition != RecordMetadata.UNKNOWN_PARTITION; +this.partition = partition; + +if (log.isTraceEnabled()) { +// Log the message here, because we don't know the partition before that. +log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); +} +} + +public int getPartition() { +return partition; +} + +public TopicPartition topicPartition() { +if (record == null) Review Comment: Hmm, by returning a null, we are breaking the contract that RecordMetadata.topic() and RecordMetadata.partition() is never null in producer callback and interceptor. Is that a new or existing test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13879) Exponential backoff for reconnect does not work
[ https://issues.apache.org/jira/browse/KAFKA-13879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-13879: - Assignee: Chern Yih Cheah > Exponential backoff for reconnect does not work > --- > > Key: KAFKA-13879 > URL: https://issues.apache.org/jira/browse/KAFKA-13879 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 2.7.0 >Reporter: Chern Yih Cheah >Assignee: Chern Yih Cheah >Priority: Minor > > When a client connects to a SSL listener using PLAINTEXT security protocol, > after the TCP connection is setup, the client considers the channel setup is > complete (in reality the channel setup is not complete yet). The client > issues API version request after that. When issuing API version request, > reconnection exponential backoff is reset. Since the broker expects SSL > handshake, client's API version request will cause the connection to > disconnect. Reconnect will happen without exponential backoff since it has > been reset. > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L249.] > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
junrao commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r866389159 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -51,6 +51,7 @@ import scala.math._ */ abstract class AbstractFetcherThread(name: String, clientId: String, + val leader: LeaderEndPoint, Review Comment: We could. Alternatively, if we add brokerEndpoint() to LeaderEndPoint, we could implement it in RemoteLeaderEndPoint by getting it from the ReplicaFetcherBlockingSend inside RemoteLeaderEndPoint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg
artemlivshits commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r866386035 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1403,25 +1452,54 @@ public boolean isDone() { } /** - * A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and - * notifies producer interceptors about the request completion. + * Callbacks that are called by the RecordAccumulator append functions: + * - user callback + * - interceptor callbacks + * - partition callback */ -private static class InterceptorCallback implements Callback { +private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; -private final TopicPartition tp; +private final ProducerRecord record; +protected int partition = RecordMetadata.UNKNOWN_PARTITION; -private InterceptorCallback(Callback userCallback, ProducerInterceptors interceptors, TopicPartition tp) { +private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; -this.tp = tp; +this.record = record; } +@Override public void onCompletion(RecordMetadata metadata, Exception exception) { -metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); +if (metadata == null) { +metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); +} this.interceptors.onAcknowledgement(metadata, exception); if (this.userCallback != null) this.userCallback.onCompletion(metadata, exception); } + +@Override +public void setPartition(int partition) { +assert partition != RecordMetadata.UNKNOWN_PARTITION; +this.partition = partition; + +if (log.isTraceEnabled()) { +// Log the message here, because we don't know the partition before that. +log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); +} +} + +public int getPartition() { +return partition; +} + +public TopicPartition topicPartition() { +if (record == null) Review Comment: I've added it because there is a test that passes a null and expects some error handling. From that perspective, adding an assert would contradict the expectation that there is defined and tested error handling (in my mind, an assert means the behavior is undefined and all bets are off). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang closed pull request #11917: KAFKA-13785: [8/N][emit final] time-ordered session store
guozhangwang closed pull request #11917: KAFKA-13785: [8/N][emit final] time-ordered session store URL: https://github.com/apache/kafka/pull/11917 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #12127: KAFKA-13785: [8/N][emit final] time-ordered session store
guozhangwang merged PR #12127: URL: https://github.com/apache/kafka/pull/12127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12128: KAFKA-10199: Implement adding active tasks to the state updater
guozhangwang commented on code in PR #12128: URL: https://github.com/apache/kafka/pull/12128#discussion_r866368735 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.State; +import org.slf4j.Logger; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +public class DefaultStateUpdater implements StateUpdater { + +private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " + +"Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact)."; + +private class StateUpdaterThread extends Thread { + +private final ChangelogReader changelogReader; +private final AtomicBoolean isRunning = new AtomicBoolean(true); +private final java.util.function.Consumer> offsetResetter; +private final Map updatingTasks = new HashMap<>(); +private final Logger log; + +public StateUpdaterThread(final String name, + final ChangelogReader changelogReader, + final java.util.function.Consumer> offsetResetter) { +super(name); +this.changelogReader = changelogReader; +this.offsetResetter = offsetResetter; + +final String logPrefix = String.format("%s ", name); +final LogContext logContext = new LogContext(logPrefix); +log = logContext.logger(DefaultStateUpdater.class); +} + +public Collection getAllUpdatingTasks() { +return updatingTasks.values(); +} + +@Override +public void run() { +try { +while (isRunning.get()) { +try { +performActionsOnTasks(); +restoreTasks(); +waitIfAllChangelogsCompletelyRead(); +} catch (final InterruptedException interruptedException) { +return; +} +} +} catch (final RuntimeException anyOtherException) { +log.error("An unexpected error occurred within the state updater thread: " + anyOtherException); +final ExceptionAndTasks exceptionAndTasks = new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), anyOtherException); +updatingTasks.clear(); +failedTasks.add(exceptionAndTasks); +isRunning.set(false); +} finally { +clear(); +} +} + +private void performActionsOnTasks() throws InterruptedException { +tasksAndActionsLock.lock(); +try { +for (final TaskAndAction taskAndAction : getTasksAndActions()) { +final Task task = taskAndAction.task; +final Action action = taskAndAction.action; +switch (action) { +case ADD: +addTask(task); +break; +
[GitHub] [kafka] guozhangwang merged pull request #12128: KAFKA-10199: Implement adding active tasks to the state updater
guozhangwang merged PR #12128: URL: https://github.com/apache/kafka/pull/12128 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
rittikaadhikari commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r866371951 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -51,6 +51,7 @@ import scala.math._ */ abstract class AbstractFetcherThread(name: String, clientId: String, + val leader: LeaderEndPoint, Review Comment: Yes, that does make sense. Probably it would be best to pass in BrokerEndPoint to LeaderEndpoint and construct a ReplicaFetcherBlockingSend within the RemoteLeaderEndPoint. However, ReplicaFetcherBlockingSend requires several other parameters (like metrics, time, fetcherId, etc.), which may make the constructor for RemoteLeaderEndPoint look a bit cluttered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
junrao commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r866357178 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -51,6 +51,7 @@ import scala.math._ */ abstract class AbstractFetcherThread(name: String, clientId: String, + val leader: LeaderEndPoint, Review Comment: Yes, we need to pass brokerEndPoint to LocalLeaderEndPoint. Intuitively, it seems that it makes sense for LeaderEndPoint to own brokerEndPoint instead of passing in two endpoints around. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] frosiere commented on pull request #12125: KAFKA-13864: provide the construct interceptor for KafkaProducer and KafkaConsumer
frosiere commented on PR #12125: URL: https://github.com/apache/kafka/pull/12125#issuecomment-1119022631 Tests and checks are failing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532518#comment-17532518 ] François Rosière commented on KAFKA-13864: -- Thanks for the PR. I added some comments. To progress on this issue, we still miss a vote or an approval to be able to merge the issue once it will have been fully reviewed... > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] frosiere commented on a diff in pull request #12125: KAFKA-13864: provide the construct interceptor for KafkaProducer and KafkaConsumer
frosiere commented on code in PR #12125: URL: https://github.com/apache/kafka/pull/12125#discussion_r866287593 ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -663,12 +663,38 @@ public KafkaConsumer(Properties properties, public KafkaConsumer(Map configs, Deserializer keyDeserializer, Deserializer valueDeserializer) { +this(configs, keyDeserializer, valueDeserializer, null); +} + +/** + * A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value {@link Deserializer}. + * + * Valid configuration strings are documented at {@link ConsumerConfig}. + * + * Note: after creating a {@code KafkaConsumer} you must always {@link #close()} it to avoid resource leaks. + * + * @param configs The consumer configs + * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method + *won't be called in the consumer when the deserializer is passed in directly. + * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method + *won't be called in the consumer when the deserializer is passed in directly. + * @param interceptors The list interceptors for consumer that implements {$link ConsumerInterceptor}. + */ +public KafkaConsumer(Map configs, Review Comment: To avoid touching to the existing and offer more flexibility, I would have created a new constructor with a ProducerConfig to allow overriding the method ProducerConfig#getConfiguredInstances to complete the list of interceptors with instances or any other type of instances such as the MetricsReporter of the partitioner. See the Spring issue for a concrete example: https://github.com/spring-projects/spring-kafka/issues/2244 This approach can work as well. ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -663,12 +663,38 @@ public KafkaConsumer(Properties properties, public KafkaConsumer(Map configs, Deserializer keyDeserializer, Deserializer valueDeserializer) { +this(configs, keyDeserializer, valueDeserializer, null); +} + +/** + * A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value {@link Deserializer}. Review Comment: the new parameter is not mentioned ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -290,6 +290,27 @@ public KafkaProducer(Map configs, Serializer keySerializer, S keySerializer, valueSerializer, null, null, null, Time.SYSTEM); } +/** + * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}. Review Comment: Same issue with the Javadoc of this constructor ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -663,12 +663,38 @@ public KafkaConsumer(Properties properties, public KafkaConsumer(Map configs, Deserializer keyDeserializer, Deserializer valueDeserializer) { +this(configs, keyDeserializer, valueDeserializer, null); +} + +/** + * A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value {@link Deserializer}. + * + * Valid configuration strings are documented at {@link ConsumerConfig}. + * + * Note: after creating a {@code KafkaConsumer} you must always {@link #close()} it to avoid resource leaks. + * + * @param configs The consumer configs + * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method + *won't be called in the consumer when the deserializer is passed in directly. + * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method + *won't be called in the consumer when the deserializer is passed in directly. + * @param interceptors The list interceptors for consumer that implements {$link ConsumerInterceptor}. Review Comment: Java doc is not correct. The list of interceptors... $link should be replaced by @link -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #12128: KAFKA-10199: Implement adding active tasks to the state updater
cadonna opened a new pull request, #12128: URL: https://github.com/apache/kafka/pull/12128 This PR adds the default implementation of the state updater. The implementation only implements adding active tasks to the state updater. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
[ https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532480#comment-17532480 ] Guozhang Wang commented on KAFKA-13877: --- Thanks [~lkokhreidze]! > Flaky > RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags > > > Key: KAFKA-13877 > URL: https://issues.apache.org/jira/browse/KAFKA-13877 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Levani Kokhreidze >Priority: Major > Labels: newbie > > The following test fails on local testbeds about once per 10-15 runs: > {code} > java.lang.AssertionError > at org.junit.Assert.fail(Assert.java:87) > at org.junit.Assert.assertTrue(Assert.java:42) > at org.junit.Assert.assertTrue(Assert.java:53) > at > org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
rittikaadhikari commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r866208728 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -51,6 +51,7 @@ import scala.math._ */ abstract class AbstractFetcherThread(name: String, clientId: String, + val leader: LeaderEndPoint, Review Comment: However, I don't think LocalLeaderEndPoint has any params with access to the brokerEndPoint, so we would need to pass it in as a param to LocalLeaderEndPoint -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
rittikaadhikari commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r866206202 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -51,6 +51,7 @@ import scala.math._ */ abstract class AbstractFetcherThread(name: String, clientId: String, + val leader: LeaderEndPoint, Review Comment: Ahhh I see what you mean. Potentially, we could create a brokerEndPoint method in LeaderEndPoint to access that object? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13879) Exponential backoff for reconnect does not work
Chern Yih Cheah created KAFKA-13879: --- Summary: Exponential backoff for reconnect does not work Key: KAFKA-13879 URL: https://issues.apache.org/jira/browse/KAFKA-13879 Project: Kafka Issue Type: Bug Components: network Affects Versions: 2.7.0 Reporter: Chern Yih Cheah When a client connects to a SSL listener using PLAINTEXT security protocol, after the TCP connection is setup, the client considers the channel setup is complete (in reality the channel setup is not complete yet). The client issues API version request after that. When issuing API version request, reconnection exponential backoff is reset. Since the broker expects SSL handshake, client's API version request will cause the connection to disconnect. Reconnect will happen without exponential backoff since it has been reset. [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L249.] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
junrao commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r866200194 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -51,6 +51,7 @@ import scala.math._ */ abstract class AbstractFetcherThread(name: String, clientId: String, + val leader: LeaderEndPoint, Review Comment: Well, RemoteLeaderEndPoint takes a ReplicaFetcherBlockingSend, which takes a BrokerEndPoint, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rittikaadhikari commented on pull request #12005: KAFKA-13803: Refactor Leader API Access
rittikaadhikari commented on PR #12005: URL: https://github.com/apache/kafka/pull/12005#issuecomment-1118915061 @junrao: Thank you for the comments. Whenever you get a chance, this PR is ready for review again! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg
junrao commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r866194199 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java: ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** + * Built-in default partitioner. Note, that this is just a utility class that is used directly from + * RecordAccumulator, it does not implement the Partitioner interface. + * + * The class keeps track of various bookkeeping information required for adaptive sticky partitioning + * (described in detail in KIP-794). There is one partitioner object per topic. + */ +public class BuiltInPartitioner { +private final Logger log; +private final String topic; +private final int stickyBatchSize; + +private volatile PartitionLoadStats partitionLoadStats = null; +private final AtomicReference stickyPartitionInfo = new AtomicReference<>(); + +// Visible and used for testing only. +static volatile public Supplier mockRandom = null; + +/** + * BuiltInPartitioner constructor. + * + * @param topic The topic + * @param stickyBatchSize How much to produce to partition before switch + */ +public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) { +this.log = logContext.logger(BuiltInPartitioner.class); +this.topic = topic; +this.stickyBatchSize = stickyBatchSize; +} + +/** + * Calculate the next partition for the topic based on the partition load stats. + */ +private int nextPartition(Cluster cluster) { +int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt()); + +// Cache volatile variable in local variable. +PartitionLoadStats partitionLoadStats = this.partitionLoadStats; +int partition; + +if (partitionLoadStats == null) { +// We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next +// partition based on uniform distribution. +List availablePartitions = cluster.availablePartitionsForTopic(topic); +if (availablePartitions.size() > 0) { +partition = availablePartitions.get(random % availablePartitions.size()).partition(); +} else { +// We don't have available partitions, just pick one among all partitions. +List partitions = cluster.partitionsForTopic(topic); +partition = random % partitions.size(); +} +} else { +// Calculate next partition based on load distribution. +// Note that partitions without leader are excluded from the partitionLoadStats. +assert partitionLoadStats.length > 0; + +int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable; +int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1]; + +// By construction, the cumulative frequency table is sorted, so we can use binary +// search to find the desired index. +int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom); + +// binarySearch results the index of the found element, or -(insertion_point) - 1 +// (where insertion_point is the index of the first element greater than the key). +// We need to get the index of the first value that is strictly greater, which +// would be the insertion point, except if we found the element that's equal to +//
[jira] [Assigned] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
[ https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze reassigned KAFKA-13877: - Assignee: Levani Kokhreidze > Flaky > RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags > > > Key: KAFKA-13877 > URL: https://issues.apache.org/jira/browse/KAFKA-13877 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Levani Kokhreidze >Priority: Major > Labels: newbie > > The following test fails on local testbeds about once per 10-15 runs: > {code} > java.lang.AssertionError > at org.junit.Assert.fail(Assert.java:87) > at org.junit.Assert.assertTrue(Assert.java:42) > at org.junit.Assert.assertTrue(Assert.java:53) > at > org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
[ https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532474#comment-17532474 ] Levani Kokhreidze commented on KAFKA-13877: --- I will take this on. > Flaky > RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags > > > Key: KAFKA-13877 > URL: https://issues.apache.org/jira/browse/KAFKA-13877 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Levani Kokhreidze >Priority: Major > Labels: newbie > > The following test fails on local testbeds about once per 10-15 runs: > {code} > java.lang.AssertionError > at org.junit.Assert.fail(Assert.java:87) > at org.junit.Assert.assertTrue(Assert.java:42) > at org.junit.Assert.assertTrue(Assert.java:53) > at > org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] junrao commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg
junrao commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r866191785 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java: ## @@ -25,12 +23,15 @@ private final Cluster cluster; private final Serializer keySerializer; -private final DefaultPartitioner defaultPartitioner; +@SuppressWarnings("deprecation") +private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner; + +@SuppressWarnings("deprecation") public DefaultStreamPartitioner(final Serializer keySerializer, final Cluster cluster) { this.cluster = cluster; this.keySerializer = keySerializer; -this.defaultPartitioner = new DefaultPartitioner(); +this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner(); Review Comment: @artemlivshits : Actually, you are right. Producer will only call onNewBatch() for the DefaultPartitioner instance created internally. So, it won't have effect on the producer instance created in DefaultStreamPartitioner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg
junrao commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r866187547 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1403,25 +1452,54 @@ public boolean isDone() { } /** - * A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and - * notifies producer interceptors about the request completion. + * Callbacks that are called by the RecordAccumulator append functions: + * - user callback + * - interceptor callbacks + * - partition callback */ -private static class InterceptorCallback implements Callback { +private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; -private final TopicPartition tp; +private final ProducerRecord record; +protected int partition = RecordMetadata.UNKNOWN_PARTITION; -private InterceptorCallback(Callback userCallback, ProducerInterceptors interceptors, TopicPartition tp) { +private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; -this.tp = tp; +this.record = record; } +@Override public void onCompletion(RecordMetadata metadata, Exception exception) { -metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); +if (metadata == null) { +metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); +} this.interceptors.onAcknowledgement(metadata, exception); if (this.userCallback != null) this.userCallback.onCompletion(metadata, exception); } + +@Override +public void setPartition(int partition) { +assert partition != RecordMetadata.UNKNOWN_PARTITION; +this.partition = partition; + +if (log.isTraceEnabled()) { +// Log the message here, because we don't know the partition before that. +log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); +} +} + +public int getPartition() { +return partition; +} + +public TopicPartition topicPartition() { +if (record == null) Review Comment: We have a bunch of code in send() that depends on record being not null. Perhaps it's better to assert non-null record early in send()? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…
guozhangwang commented on PR #12104: URL: https://github.com/apache/kafka/pull/12104#issuecomment-1118887691 Thanks @vamossagar12 , the failed tests still contains `testDescribeUnderMinIsrPartitionsMixed(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12104/4/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_17_and_Scala_2_13___testDescribeUnderMinIsrPartitionsMixed_String__quorum_zk_2)` which seems relevant. Do you want to take a look into them before I re-trigger the build? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key
[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532427#comment-17532427 ] Matthias J. Sax commented on KAFKA-8769: Just a "random" comment, based on some other discussion I just had: if we do track stream time per key, it could be an issue if a key "goes away" – stream-time for this key would stop to advance, and thus we could not close the corresponding window (for windowed aggregations and left/outer stream-stream join). > Consider computing stream time independently per key > > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-discussion, needs-kip > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13878) Connect deadlock in WorkerConnector on desired state change
[ https://issues.apache.org/jira/browse/KAFKA-13878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-13878: Component/s: KafkaConnect > Connect deadlock in WorkerConnector on desired state change > --- > > Key: KAFKA-13878 > URL: https://issues.apache.org/jira/browse/KAFKA-13878 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Greg Harris >Priority: Major > > We've experienced multiple instances of deadlocks where the connector thread > is blocked indefinitely with the following stacktrace: > {noformat} > "connector-thread-myconnector" #2059323 prio=5 os_prio=0 cpu=123.43ms > elapsed=147352.41s tid=0x7f5588160de0 nid=0x18be in Object.wait() > [0x7f550e3fe000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(java.base@16.0.2/Native Method) > - waiting on > at java.lang.Object.wait(java.base@16.0.2/Object.java:320) > at > org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:149) > - locked <0x0005d2253818> (a > org.apache.kafka.connect.runtime.WorkerConnector) > at > org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:118) > at > java.util.concurrent.Executors$RunnableAdapter.call(java.base@16.0.2/Executors.java:515) > at > java.util.concurrent.FutureTask.run(java.base@16.0.2/FutureTask.java:264) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@16.0.2/ThreadPoolExecutor.java:1130) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@16.0.2/ThreadPoolExecutor.java:630) > at java.lang.Thread.run(java.base@16.0.2/Thread.java:831){noformat} > This appears to be caused by a race condition where a notify() is never > delivered to a task, causing the connector thread to wait indefinitely for a > notify() call that never comes. > This causes the connector to be unable to process further state changes, or > generate task configurations. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13878) Connect deadlock in WorkerConnector on desired state change
Greg Harris created KAFKA-13878: --- Summary: Connect deadlock in WorkerConnector on desired state change Key: KAFKA-13878 URL: https://issues.apache.org/jira/browse/KAFKA-13878 Project: Kafka Issue Type: Bug Reporter: Greg Harris We've experienced multiple instances of deadlocks where the connector thread is blocked indefinitely with the following stacktrace: {noformat} "connector-thread-myconnector" #2059323 prio=5 os_prio=0 cpu=123.43ms elapsed=147352.41s tid=0x7f5588160de0 nid=0x18be in Object.wait() [0x7f550e3fe000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(java.base@16.0.2/Native Method) - waiting on at java.lang.Object.wait(java.base@16.0.2/Object.java:320) at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:149) - locked <0x0005d2253818> (a org.apache.kafka.connect.runtime.WorkerConnector) at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:118) at java.util.concurrent.Executors$RunnableAdapter.call(java.base@16.0.2/Executors.java:515) at java.util.concurrent.FutureTask.run(java.base@16.0.2/FutureTask.java:264) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@16.0.2/ThreadPoolExecutor.java:1130) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@16.0.2/ThreadPoolExecutor.java:630) at java.lang.Thread.run(java.base@16.0.2/Thread.java:831){noformat} This appears to be caused by a race condition where a notify() is never delivered to a task, causing the connector thread to wait indefinitely for a notify() call that never comes. This causes the connector to be unable to process further state changes, or generate task configurations. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
[ https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-13877: -- Labels: newbie (was: ) > Flaky > RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags > > > Key: KAFKA-13877 > URL: https://issues.apache.org/jira/browse/KAFKA-13877 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > Labels: newbie > > The following test fails on local testbeds about once per 10-15 runs: > {code} > java.lang.AssertionError > at org.junit.Assert.fail(Assert.java:87) > at org.junit.Assert.assertTrue(Assert.java:42) > at org.junit.Assert.assertTrue(Assert.java:53) > at > org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
Guozhang Wang created KAFKA-13877: - Summary: Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags Key: KAFKA-13877 URL: https://issues.apache.org/jira/browse/KAFKA-13877 Project: Kafka Issue Type: Bug Components: streams, unit tests Reporter: Guozhang Wang The following test fails on local testbeds about once per 10-15 runs: {code} java.lang.AssertionError at org.junit.Assert.fail(Assert.java:87) at org.junit.Assert.assertTrue(Assert.java:42) at org.junit.Assert.assertTrue(Assert.java:53) at org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] junrao commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg
junrao commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r866131407 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java: ## @@ -25,12 +23,15 @@ private final Cluster cluster; private final Serializer keySerializer; -private final DefaultPartitioner defaultPartitioner; +@SuppressWarnings("deprecation") +private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner; + +@SuppressWarnings("deprecation") public DefaultStreamPartitioner(final Serializer keySerializer, final Cluster cluster) { this.cluster = cluster; this.keySerializer = keySerializer; -this.defaultPartitioner = new DefaultPartitioner(); +this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner(); Review Comment: Hmm, onNewBatch() in DefaultPartitioner is not called by the application code. It's called inside Sender. Converting DefaultStreamPartitioner to the new built-in partitioner is a bit tricky since kstreams calls the partitioner explicitly and then passes in the partition during send. We need to change kstreams code so that it lets the producer determine the partition. This can be done in a separate PR. cc @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12127: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on PR #12127: URL: https://github.com/apache/kafka/pull/12127#issuecomment-1118838068 cc @lihaosky @mjsax @vvcephei -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11917: KAFKA-13785: [8/N][emit final] time-ordered session store
guozhangwang commented on PR #11917: URL: https://github.com/apache/kafka/pull/11917#issuecomment-1118837108 The copy PR is here: https://github.com/apache/kafka/pull/12127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12037: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang commented on PR #12037: URL: https://github.com/apache/kafka/pull/12037#issuecomment-1118836834 https://github.com/apache/kafka/pull/12127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request, #12127: KAFKA-13785: [7/N][Emit final] emit final for sliding window
guozhangwang opened a new pull request, #12127: URL: https://github.com/apache/kafka/pull/12127 This is a copy PR of #11917. The major diffs are: 1) Avoid extra byte array allocation for fixed upper/lower range serialization. 2) Rename some class names to be more consistent. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #11917: KAFKA-13785: [8/N][emit final] time-ordered session store
guozhangwang commented on code in PR #11917: URL: https://github.com/apache/kafka/pull/11917#discussion_r865488275 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.KeyFirstSessionKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas.TimeFirstSessionKeySchema; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; + +/** + * A RocksDB backed time-ordered segmented bytes store for session key schema. + */ +public class RocksDBTimeOrderedSessionSegmentedBytesStore extends AbstractRocksDBTimeOrderedSegmentedBytesStore { + +private class SessionKeySchemaIndexToBaseStoreIterator extends IndexToBaseStoreIterator { +SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator indexIterator) { +super(indexIterator); +} + +@Override +protected Bytes getBaseKey(final Bytes indexKey) { +final Window window = KeyFirstSessionKeySchema.extractWindow(indexKey.get()); +final byte[] key = KeyFirstSessionKeySchema.extractKeyBytes(indexKey.get()); + +return TimeFirstSessionKeySchema.toBinary(Bytes.wrap(key), window.start(), window.end()); +} +} + +RocksDBTimeOrderedSessionSegmentedBytesStore(final String name, + final String metricsScope, + final long retention, + final long segmentInterval, + final boolean withIndex) { +super(name, metricsScope, retention, segmentInterval, new TimeFirstSessionKeySchema(), Review Comment: We can discuss in a later time in another PR: for session stores, as we now use time-first as the base store, we would not probably need to use the variable upper/lower range for it any more. ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java: ## @@ -32,89 +30,19 @@ import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; import org.rocksdb.RocksDBException; import org.rocksdb.WriteBatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * RocksDB store backed by two SegmentedBytesStores which can optimize scan by time as well as window - * lookup for a specific key. - * - * Schema for first SegmentedBytesStore (base store) is as below: - * Key schema: | timestamp + recordkey | - * Value schema: | value |. Value here is determined by caller. - * - * Schema for second SegmentedBytesStore (index store) is as below: - * Key schema: | record + timestamp | - * Value schema: || - * - * Operations: - * Put: 1. Put to index store. 2. Put to base store. - * Delete: 1. Delete from base store. 2. Delete from index store. - * Since we need to update two stores, failure can happen in the middle. We put in index store first - * to make sure if a failure happens in second step and the view is inconsistent, we can't get the - * value for the key. We delete from base store first to make sure if a failure happens in second step - * and the view is inconsistent, we can't get the value for the key. - * - * Note: - * Index store can be optional if we can construct the timestamp in base store instead of looking
[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
rittikaadhikari commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r866120522 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -51,6 +51,7 @@ import scala.math._ */ abstract class AbstractFetcherThread(name: String, clientId: String, + val leader: LeaderEndPoint, Review Comment: iiuc, the BrokerEndPoint isn't passed into the LeaderEndPoint (not Remote or Local)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
junrao commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r865419210 ## core/src/main/scala/kafka/server/LeaderEndPoint.scala: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} + +import scala.collection.Map + +/** + * The LeaderEndPoint acts as an abstraction which serves all fetches from the leader + * for the fetcher threads. + */ +trait LeaderEndPoint { + + type FetchData = FetchResponseData.PartitionData + type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition + + /** + * A boolean specifying if truncation when fetching from the leader is supported + */ + def isTruncationOnFetchSupported: Boolean + + /** + * Initiate closing access to fetches from leader. + */ + def initiateClose(): Unit + + /** + * Closes access to fetches from leader. + */ + def close(): Unit + + /** + * Given a fetchRequest, carries out the expected request and returns + * the results from fetching from the leader. + * + * @param fetchRequest The fetch request we want to carry out + * + * @return A map of topic partition -> fetch data + */ + def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] + + /** + * Fetches the log start offset of the given topic partition, at a specific + * leader epoch, from the leader. + * + * @param topicPartition The topic partition that we want to fetch from + * @param currentLeaderEpoch An int representing the current leader epoch that we want to fetch from + * + * @return A long representing the earliest offset in the leader's topic partition. + */ + def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long + + /** + * Fetches the log end offset of the given topic partition, at a specific + * leader epoch, from the leader. + * + * @param topicPartition The topic partition that we want to fetch from + * @param currentLeaderEpoch An int representing the current leader epoch that we want to fetch from Review Comment: An int representing the current leader epoch that we want to fetch from => current leader epoch of the requestor ## core/src/main/scala/kafka/server/LeaderEndPoint.scala: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} + +import scala.collection.Map + +/** + * The LeaderEndPoint acts as an abstraction which serves all fetches from the leader + * for the fetcher threads. + */ +trait LeaderEndPoint { + + type FetchData = FetchResponseData.PartitionData + type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition + + /** + * A boolean
[GitHub] [kafka] mumrah commented on pull request #12050: KAFKA-13830 MetadataVersion integration for KRaft controller
mumrah commented on PR #12050: URL: https://github.com/apache/kafka/pull/12050#issuecomment-1118712139 @dengziming @cmccabe I've updated this PR with the changes from #12072. I've also modified the bootstrap mechanism to use a file with the KRaft snapshot format rather than a property in meta.properties. This is a slight departure from the KIP, but using a snapshot will open up the ability for us to bootstrap other records on first startup. If we agree that this is a good approach, I can update the KIP and discussion thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13876) Mirrormaker-2 consumer settings not working
[ https://issues.apache.org/jira/browse/KAFKA-13876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christian Bates updated KAFKA-13876: Description: I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file. I followed the guide at [https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between disconnect messages I get. e.g. I set: {{CLOUD_EU.consumer.request.timeout.ms=12}} In the properties that I start MM-2 with and it is ignored. based on various guides I have found while looking at this, I have also tried {{CLOUD_EU.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.override.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}} None of which have worked. How can I change the consumer request.timeout setting? Context: I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1. For testing I am currently trying just to replicate topics 1 way, from EU -> NA. This works fine for topics with small messages on them, but one of my topic has binary messages up to 20MB in size. When I try to replicate that topic I get an error every 30 seconds {{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. (org.apache.kafka.clients.FetchSessionHandler:481)}} {{org.apache.kafka.common.errors.DisconnectException}} When logging in DEBUG to get more information we get {{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] Disconnecting from node 2 due to request timeout. (org.apache.kafka.clients.NetworkClient:784)}} {{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-29, correlationId=35) due to node 2 being disconnected (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}} It gets stuck in a loop constantly disconnecting with request timeout every 30s and then trying again. Looking at this, I suspected that the problem is the `request.timeout.ms` was on the default (30s) and timing out trying to read the topic with many large messages, hence I was trying to override the consumer settings to have a longer timeout. The 2 specified versions are the ones I have explicitly tested against. Properties file I am using to start the cluster: {{clusters = CLOUD_EU, CLOUD_NA}} {{CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092}} {{CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092}} {{CLOUD_EU->CLOUD_NA.enabled = true}} {{CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU}} {{CLOUD_NA->CLOUD_EU.enabled = false}} {{replication.factor=3}} {{tasks.max = 1}} {{checkpoints.topic.replication.factor=3}} {{heartbeats.topic.replication.factor=3}} {{offset-syncs.topic.replication.factor=3}} {{offset.storage.replication.factor=3}} {{status.storage.replication.factor=3}} {{config.storage.replication.factor=3}} {{CLOUD_EU.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.session.timeout.ms=15}} was: I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file. I followed the guide at [https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between disconnect messages I get. e.g. I set: {{CLOUD_EU.consumer.request.timeout.ms=12}} In the properties that I start MM-2 with and it is ignored. based on various guides I have found while looking at this, I have also tried {{CLOUD_EU.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.override.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}} None of which have worked. How can I change the consumer request.timeout setting? Context: I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1. For testing I am currently trying just to replicate topics 1 way, from EU -> NA. This works fine for topics with small
[jira] [Updated] (KAFKA-13876) Mirrormaker-2 consumer settings not working
[ https://issues.apache.org/jira/browse/KAFKA-13876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christian Bates updated KAFKA-13876: Description: I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file. I followed the guide at [https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between disconnect messages I get. e.g. I set: {{CLOUD_EU.consumer.request.timeout.ms=12}} In the properties that I start MM-2 with and it is ignored. based on various guides I have found while looking at this, I have also tried {{CLOUD_EU.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.override.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}} None of which have worked. How can I change the consumer request.timeout setting? Context: I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1. For testing I am currently trying just to replicate topics 1 way, from EU -> NA. This works fine for topics with small messages on them, but one of my topic has binary messages up to 20MB in size. When I try to replicate that topic I get an error every 30 seconds {{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. (org.apache.kafka.clients.FetchSessionHandler:481)}} {{org.apache.kafka.common.errors.DisconnectException}} When logging in DEBUG to get more information we get {{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] Disconnecting from node 2 due to request timeout. (org.apache.kafka.clients.NetworkClient:784)}} {{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-29, correlationId=35) due to node 2 being disconnected (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}} It gets stuck in a loop constantly disconnecting with request timeout every 30s and then trying again. Looking at this, I suspected that the problem is the `request.timeout.ms` was on the default (30s) and timing out trying to read the topic with many large messages, hence I was trying to override the consumer settings to have a longer timeout. The 2 specified versions are the ones I have explicitly tested against. Properties file I am using to start the cluster: {{clusters = CLOUD_EU, CLOUD_NA}} {{CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092}} {{CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092}} {{CLOUD_EU->CLOUD_NA.enabled = true}} {{CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU}} {{CLOUD_NA->CLOUD_EU.enabled = false}} {{replication.factor=3}} {{{}tasks.max = 1{}}}{{{}checkpoints.topic.replication.factor=3{}}} {{heartbeats.topic.replication.factor=3}} {{offset-syncs.topic.replication.factor=3}} {{offset.storage.replication.factor=3}} {{status.storage.replication.factor=3}} {{config.storage.replication.factor=3}} {{CLOUD_EU.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.session.timeout.ms=15}} was: I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file. I followed the guide at [https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between disconnect messages I get. e.g. I set: {{CLOUD_EU.consumer.request.timeout.ms=12}} In the properties that I start MM-2 with and it is ignored. based on various guides I have found while looking at this, I have also tried {{CLOUD_EU.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.override.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}} None of which have worked. How can I change the consumer request.timeout setting? Context: I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1. For testing I am currently trying just to replicate topics 1 way, from EU -> NA. This works fine for topics with small
[jira] [Updated] (KAFKA-13876) Mirrormaker-2 consumer settings not working
[ https://issues.apache.org/jira/browse/KAFKA-13876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christian Bates updated KAFKA-13876: Description: I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file. I followed the guide at [https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between disconnect messages I get. e.g. I set: {{CLOUD_EU.consumer.request.timeout.ms=12}} In the properties that I start MM-2 with and it is ignored. based on various guides I have found while looking at this, I have also tried {{CLOUD_EU.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.override.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}} None of which have worked. How can I change the consumer request.timeout setting? Context: I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1. For testing I am currently trying just to replicate topics 1 way, from EU -> NA. This works fine for topics with small messages on them, but one of my topic has binary messages up to 20MB in size. When I try to replicate that topic I get an error every 30 seconds {{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. (org.apache.kafka.clients.FetchSessionHandler:481)}} {{org.apache.kafka.common.errors.DisconnectException}} When logging in DEBUG to get more information we get {{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] Disconnecting from node 2 due to request timeout. (org.apache.kafka.clients.NetworkClient:784)}} {{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-29, correlationId=35) due to node 2 being disconnected (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}} It gets stuck in a loop constantly disconnecting with request timeout every 30s and then trying again. Looking at this, I suspected that the problem is the `request.timeout.ms` was on the default (30s) and timing out trying to read the topic with many large messages, hence I was trying to override the consumer settings to have a longer timeout. Properties file I am using to start the cluster: clusters = CLOUD_EU, CLOUD_NA CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092 CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092 CLOUD_EU->CLOUD_NA.enabled = true CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU CLOUD_NA->CLOUD_EU.enabled = false replication.factor=3 tasks.max = 1 checkpoints.topic.replication.factor=3 heartbeats.topic.replication.factor=3 offset-syncs.topic.replication.factor=3 offset.storage.replication.factor=3 status.storage.replication.factor=3 config.storage.replication.factor=3 CLOUD_EU.consumer.request.timeout.ms=12 CLOUD_EU.consumer.session.timeout.ms=15 was: I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file. I followed the guide at [https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between disconnect messages I get. e.g. I set: {{CLOUD_EU.consumer.request.timeout.ms=12}} In the properties that I start MM-2 with and it is ignored. based on various guides I have found while looking at this, I have also tried {{CLOUD_EU.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.override.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}} None of which have worked. How can I change the consumer request.timeout setting? Context: I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1. For testing I am currently trying just to replicate topics 1 way, from EU -> NA. This works fine for topics with small messages on them, but one of my topic has binary messages up to 20MB in size. When I try to replicate that topic I get an error every 30
[jira] [Updated] (KAFKA-13876) Mirrormaker-2 consumer settings not working
[ https://issues.apache.org/jira/browse/KAFKA-13876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christian Bates updated KAFKA-13876: Description: I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file. I followed the guide at [https://github.com/apache/kafka/tree/trunk/connect/mirror] to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between disconnect messages I get. e.g. I set: {{CLOUD_EU.consumer.request.timeout.ms=12}} In the properties that I start MM-2 with and it is ignored. based on various guides I have found while looking at this, I have also tried {{CLOUD_EU.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.override.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}} None of which have worked. How can I change the consumer request.timeout setting? Context: I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1. For testing I am currently trying just to replicate topics 1 way, from EU -> NA. This works fine for topics with small messages on them, but one of my topic has binary messages up to 20MB in size. When I try to replicate that topic I get an error every 30 seconds {{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. (org.apache.kafka.clients.FetchSessionHandler:481)}} {{org.apache.kafka.common.errors.DisconnectException}} When logging in DEBUG to get more information we get {{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] Disconnecting from node 2 due to request timeout. (org.apache.kafka.clients.NetworkClient:784)}} {{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-29, correlationId=35) due to node 2 being disconnected (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}} It gets stuck in a loop constantly disconnecting with request timeout every 30s and then trying again. Looking at this, I suspected that the problem is the `request.timeout.ms` was on the default (30s) and timing out trying to read the topic with many large messages, hence I was trying to override the consumer settings to have a longer timeout. Properties file I am using to start the cluster: # specify any number of cluster aliases clusters = CLOUD_EU, CLOUD_NA # connection information for each cluster CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092 CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092 # enable and configure individual replication flows CLOUD_EU->CLOUD_NA.enabled = true CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU CLOUD_NA->CLOUD_EU.enabled = false replication.factor=3 tasks.max = 1 # Internal Topic Settings # checkpoints.topic.replication.factor=3 heartbeats.topic.replication.factor=3 offset-syncs.topic.replication.factor=3 offset.storage.replication.factor=3 status.storage.replication.factor=3 config.storage.replication.factor=3 Kafka Settings ### # CLOUD_EU cluster over writes CLOUD_EU.consumer.request.timeout.ms=12 CLOUD_EU.consumer.session.timeout.ms=15 was: I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file. I followed the guide at https://github.com/apache/kafka/tree/trunk/connect/mirror to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between disconnect messages I get. e.g. I set: {{CLOUD_EU.consumer.request.timeout.ms=12}} In the properties that I start MM-2 with and it is ignored. based on various guides I have found while looking at this, I have also tried {{CLOUD_EU.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.override.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}} None of which have worked. How can I change the consumer request.timeout setting? Context: I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU)
[jira] [Created] (KAFKA-13876) Mirrormaker-2 consumer settings not working
Christian Bates created KAFKA-13876: --- Summary: Mirrormaker-2 consumer settings not working Key: KAFKA-13876 URL: https://issues.apache.org/jira/browse/KAFKA-13876 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.6.1, 2.5.0 Reporter: Christian Bates I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file. I followed the guide at https://github.com/apache/kafka/tree/trunk/connect/mirror to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between disconnect messages I get. e.g. I set: {{CLOUD_EU.consumer.request.timeout.ms=12}} In the properties that I start MM-2 with and it is ignored. based on various guides I have found while looking at this, I have also tried {{CLOUD_EU.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.request.timeout.ms=12}} {{CLOUD_EU.consumer.override.request.timeout.ms=12}} {{CLOUD_EU.cluster.consumer.override.request.timeout.ms=12}} None of which have worked. How can I change the consumer request.timeout setting? Context: I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1. For testing I am currently trying just to replicate topics 1 way, from EU -> NA. This works fine for topics with small messages on them, but one of my topic has binary messages up to 20MB in size. When I try to replicate that topic I get an error every 30 seconds {{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. (org.apache.kafka.clients.FetchSessionHandler:481)}} {{org.apache.kafka.common.errors.DisconnectException}} When logging in DEBUG to get more information we get {{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] Disconnecting from node 2 due to request timeout. (org.apache.kafka.clients.NetworkClient:784)}} {{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-29, correlationId=35) due to node 2 being disconnected (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}} It gets stuck in a loop constantly disconnecting with request timeout every 30s and then trying again. Looking at this, I suspected that the problem is the `request.timeout.ms` was on the default (30s) and timing out trying to read the topic with many large messages, hence I was trying to override the consumer settings to have a longer timeout. Properties file I am using to start the cluster: # specify any number of cluster aliases clusters = CLOUD_EU, CLOUD_NA # connection information for each cluster CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092 CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092 # enable and configure individual replication flows CLOUD_EU->CLOUD_NA.enabled = true CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU CLOUD_NA->CLOUD_EU.enabled = false replication.factor=3 tasks.max = 1 # Internal Topic Settings # checkpoints.topic.replication.factor=3 heartbeats.topic.replication.factor=3 offset-syncs.topic.replication.factor=3 offset.storage.replication.factor=3 status.storage.replication.factor=3 config.storage.replication.factor=3 Kafka Settings ### # CLOUD_EU cluster over writes CLOUD_EU.consumer.request.timeout.ms=12 CLOUD_EU.consumer.session.timeout.ms=15 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dengziming commented on a diff in pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode
dengziming commented on code in PR #12108: URL: https://github.com/apache/kafka/pull/12108#discussion_r865580075 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -1751,13 +1777,32 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete), AlterConfigOp.OpType.SUBTRACT) ).asJavaCollection -alterResult = client.incrementalAlterConfigs(Map( +alterConfigs = Map( topic1Resource -> topic1AlterConfigs, topic2Resource -> topic2AlterConfigs -).asJava) +) +alterResult = client.incrementalAlterConfigs(alterConfigs.asJava) assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) alterResult.all.get +if (isKRaftTest()) { Review Comment: Yeah, nice suggestion, waiting for the metadata offset is simpler and more generic. We can make use of this util function in other places since there are already many other places using this similar specific assertion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13871) The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the RaftConfig class is incorrect
[ https://issues.apache.org/jira/browse/KAFKA-13871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yingquan he reassigned KAFKA-13871: --- Assignee: (was: yingquan he) > The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the > RaftConfig class is incorrect > -- > > Key: KAFKA-13871 > URL: https://issues.apache.org/jira/browse/KAFKA-13871 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: yingquan he >Priority: Minor > > The syntax of the field QUORUM_FETCH_TIMEOUT_MS_DOC is incorrect. `a > election` should be changed to `an election`. > > {code:java} > public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time > without a successful fetch from " + > "the current leader before becoming a candidate and triggering a election > for voters; Maximum time without " + > "receiving fetch from a majority of the quorum before asking around to > see if there's a new epoch for leader"; {code} > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] pan3793 opened a new pull request, #12126: KAFKA-8713 KIP-581: Add new conf serialize.accept.optional.null in connect-json
pan3793 opened a new pull request, #12126: URL: https://github.com/apache/kafka/pull/12126 See details on KAFKA-8713 and KIP-581 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lqjacklee updated KAFKA-13864: -- Attachment: (was: interceptor_constructor_client.patch) > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532216#comment-17532216 ] lqjacklee commented on KAFKA-13864: --- [~frosiere] [~cadonna] please help review the code , thanks . https://github.com/apache/kafka/pull/12125/files > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] lqjack opened a new pull request, #12125: provide the construct interceptor for KafkaProducer and KafkaConsumer
lqjack opened a new pull request, #12125: URL: https://github.com/apache/kafka/pull/12125 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] konL opened a new pull request, #12124: rename isDone to hasComplete
konL opened a new pull request, #12124: URL: https://github.com/apache/kafka/pull/12124 I used deep learning to discover that the method 'isDone' changed in 4af50bb8600c37ee2e3597fba9a54a29cef94afa. The code snippet before and after the change is as follows. ``` public boolean isDone() { return isDone; } ``` ``` public boolean isDone() { return result.get() != INCOMPLETE_SENTINEL; } ``` We thought we could rename `isDone` to `hasComplete` with a more specific meaning. We are actually looking at analysing the reasons for renaming identifiers and we would like to get advice from more experienced people, so would appreciate your feedback from one of the following perspectives. 1. `(merge)`you accept this renaming opportunity and agree with the recommended name 2. `(agree and not recommend)`you accept the renaming but disagree with the recommended name, do you think the recommended name means the same thing or is the recommended name simply not suitable? 3. `(not agree and useful)`Would our suggested renaming opportunity be useful for your other refactoring activities? 4. `(not agree and not fix) `You do not think our suggested renaming opportunity is appropriate. Is this because you think the identifier meaning does not need to be renamed or is it some other issue? We look forward to your feedback! Thank you! ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13872) Partitions are truncated when leader is replaced
[ https://issues.apache.org/jira/browse/KAFKA-13872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532122#comment-17532122 ] Jack Vanlightly commented on KAFKA-13872: - I presume you would need to perform a broker decommissioning process to remove that broker from the cluster before adding a new empty broker with the same id? Is there documentation for how to decommission a dead broker safely? > Partitions are truncated when leader is replaced > > > Key: KAFKA-13872 > URL: https://issues.apache.org/jira/browse/KAFKA-13872 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.2 >Reporter: Francois Visconte >Priority: Major > Attachments: extract-2022-05-04T15_50_34.110Z.csv > > > Sample setup: > * a topic with one partition and RF=3 > * a producer using acks=1 > * min.insync.replicas to 1 > * 3 brokers 1,2,3 > * Preferred leader of the partition is brokerId 0 > > Steps to reproduce the issue > * Producer keeps producing to the partition, leader is brokerId=0 > * At some point, replicas 1 and 2 are falling behind and removed from the ISR > * The leader broker 0 has an hardware failure > * Partition transition to offline > * This leader is replaced with a new broker with an empty disk and the same > broker id 0 > * Partition transition from offline to online with leader 0, ISR = 0 > * Followers see the leader offset is 0 and decide to truncate their > partitions to 0, ISR=0,1,2 > * At this point all the topic data has been removed from all replicas and > partition size drops to 0 on all replicas > Attached some of the relevant logs. I can provide more logs if necessary -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dengziming commented on pull request #12109: KAFKA-13863: Prevent null config value when create topic in KRaft mode
dengziming commented on PR #12109: URL: https://github.com/apache/kafka/pull/12109#issuecomment-1118264999 @cmccabe, Yeah, it's unnecessary to change `ReplicationControlManager`, however, there is still a difference, in KRaft mode, the null config value is treated as removing configs, so creating topics with null config value will just act the same with creating topics without that config value. Do you think this is a reasonable change? or this is an incompatible bug? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13872) Partitions are truncated when leader is replaced
[ https://issues.apache.org/jira/browse/KAFKA-13872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532092#comment-17532092 ] Francois Visconte commented on KAFKA-13872: --- I was expecting the partition to stay offline until I decide to set the unclean.leader.election to true and one of the lagging replica to take ownership > Partitions are truncated when leader is replaced > > > Key: KAFKA-13872 > URL: https://issues.apache.org/jira/browse/KAFKA-13872 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.2 >Reporter: Francois Visconte >Priority: Major > Attachments: extract-2022-05-04T15_50_34.110Z.csv > > > Sample setup: > * a topic with one partition and RF=3 > * a producer using acks=1 > * min.insync.replicas to 1 > * 3 brokers 1,2,3 > * Preferred leader of the partition is brokerId 0 > > Steps to reproduce the issue > * Producer keeps producing to the partition, leader is brokerId=0 > * At some point, replicas 1 and 2 are falling behind and removed from the ISR > * The leader broker 0 has an hardware failure > * Partition transition to offline > * This leader is replaced with a new broker with an empty disk and the same > broker id 0 > * Partition transition from offline to online with leader 0, ISR = 0 > * Followers see the leader offset is 0 and decide to truncate their > partitions to 0, ISR=0,1,2 > * At this point all the topic data has been removed from all replicas and > partition size drops to 0 on all replicas > Attached some of the relevant logs. I can provide more logs if necessary -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dengziming commented on pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode
dengziming commented on PR #12108: URL: https://github.com/apache/kafka/pull/12108#issuecomment-1118210953 Thank you for your suggestions @hachikuji @akhileshchg, PTAL again when you have time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode
dengziming commented on code in PR #12108: URL: https://github.com/apache/kafka/pull/12108#discussion_r865581359 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -215,13 +215,17 @@ private void incrementalAlterConfigResource(ConfigResource configResource, } List newValueParts = getParts(newValue, key, configResource); if (opType == APPEND) { -if (!newValueParts.contains(opValue)) { -newValueParts.add(opValue); +for (String value: opValue.split(",")) { +if (!newValueParts.contains(value)) { Review Comment: Nice catch, found a small bug of `ConfigAdminManager` when working on this integration test. for the variable naming, I think both are OK here, I used `oldValueList` to be a better one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode
dengziming commented on code in PR #12108: URL: https://github.com/apache/kafka/pull/12108#discussion_r865580075 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -1751,13 +1777,32 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete), AlterConfigOp.OpType.SUBTRACT) ).asJavaCollection -alterResult = client.incrementalAlterConfigs(Map( +alterConfigs = Map( topic1Resource -> topic1AlterConfigs, topic2Resource -> topic2AlterConfigs -).asJava) +) +alterResult = client.incrementalAlterConfigs(alterConfigs.asJava) assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) alterResult.all.get +if (isKRaftTest()) { Review Comment: Yeah, nice suggestion, waiting for the metadata offset is simpler and more generic. I will submit a separate PR to make this a util function since there are already many other places using this similar specific assertion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org