[jira] [Updated] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output
[ https://issues.apache.org/jira/browse/KAFKA-13875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13875: -- Description: The topic describe output in quickstart doc here: [https://kafka.apache.org/quickstart] should be updated now. {code:java} bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 Topic:quickstart-events PartitionCount:1ReplicationFactor:1 Configs: Topic: quickstart-events Partition: 0Leader: 0 Replicas: 0 Isr: 0{code} After Topic Id implementation, we included the topic id info in the output now. Also the configs is not empty now. The doc should be updated to avoid new users get confused. was: The topic describe output in quickstart doc here: [https://kafka.apache.org/quickstart] should be updated now. {code:java} bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 Topic:quickstart-events PartitionCount:1ReplicationFactor:1 Configs: Topic: quickstart-events Partition: 0Leader: 0 Replicas: 0 Isr: 0{code} After Topic Id implementation, we included the topic id info in the output now. > update docs to include topoicId for kafka-topics.sh --describe output > - > > Key: KAFKA-13875 > URL: https://issues.apache.org/jira/browse/KAFKA-13875 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 3.2.0 >Reporter: Luke Chen >Priority: Major > Labels: newbie > > The topic describe output in quickstart doc here: > [https://kafka.apache.org/quickstart] should be updated now. > {code:java} > bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server > localhost:9092 > Topic:quickstart-events PartitionCount:1ReplicationFactor:1 Configs: > Topic: quickstart-events Partition: 0Leader: 0 Replicas: 0 Isr: > 0{code} > After Topic Id implementation, we included the topic id info in the output > now. Also the configs is not empty now. The doc should be updated to avoid > new users get confused. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output
Luke Chen created KAFKA-13875: - Summary: update docs to include topoicId for kafka-topics.sh --describe output Key: KAFKA-13875 URL: https://issues.apache.org/jira/browse/KAFKA-13875 Project: Kafka Issue Type: Improvement Components: admin Affects Versions: 3.2.0 Reporter: Luke Chen The topic describe output in quickstart doc here: [https://kafka.apache.org/quickstart] should be updated now. {code:java} bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 Topic:quickstart-events PartitionCount:1ReplicationFactor:1 Configs: Topic: quickstart-events Partition: 0Leader: 0 Replicas: 0 Isr: 0{code} After Topic Id implementation, we included the topic id info in the output now. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output
[ https://issues.apache.org/jira/browse/KAFKA-13875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13875: -- Labels: newbie (was: ) > update docs to include topoicId for kafka-topics.sh --describe output > - > > Key: KAFKA-13875 > URL: https://issues.apache.org/jira/browse/KAFKA-13875 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 3.2.0 >Reporter: Luke Chen >Priority: Major > Labels: newbie > > The topic describe output in quickstart doc here: > [https://kafka.apache.org/quickstart] should be updated now. > {code:java} > bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server > localhost:9092 > Topic:quickstart-events PartitionCount:1ReplicationFactor:1 Configs: > Topic: quickstart-events Partition: 0Leader: 0 Replicas: 0 Isr: > 0{code} > After Topic Id implementation, we included the topic id info in the output > now. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dengziming commented on pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions
dengziming commented on PR #12112: URL: https://github.com/apache/kafka/pull/12112#issuecomment-1118110191 @showuon I seem to see it failed but I can't find it anymore, I will close this after I can't see it fail again in a few days. @divijvaidya I would fix your comment if this test really fails again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…
dengziming commented on code in PR #12104: URL: https://github.com/apache/kafka/pull/12104#discussion_r865512959 ## core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala: ## @@ -733,12 +733,18 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi killBroker(0) val aliveServers = brokers.filterNot(_.config.brokerId == 0) TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0) - val output = TestUtils.grabConsoleOutput( -topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions" + var output = "" + TestUtils.waitUntilTrue( +() => { + output = TestUtils.grabConsoleOutput( +topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions" Review Comment: Sorry, it seems we won't remove a broker from isr if it's the only replica so this change doesn't make sense, to make it work as expected we should change it like this: ``` && broker.metadataCache.getPartitionInfo(offlineTopic, 0).get.leader() == MetadataResponse.NO_LEADER_ID ``` You can make further investigation about why the isr of TopicPartition(offlineTopic, 0) still contains broker 0 after broker 0 is killed if you are interested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13872) Partitions are truncated when leader is replaced
[ https://issues.apache.org/jira/browse/KAFKA-13872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532002#comment-17532002 ] Luke Chen commented on KAFKA-13872: --- [~fvisconte] , thanks for reporting the issue. But I think this is the expected behavior, isn't it? We always take the logs in partition leader as source of truth. I'd like to know what your expected behavior is in this case. Thanks. > Partitions are truncated when leader is replaced > > > Key: KAFKA-13872 > URL: https://issues.apache.org/jira/browse/KAFKA-13872 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.2 >Reporter: Francois Visconte >Priority: Major > Attachments: extract-2022-05-04T15_50_34.110Z.csv > > > Sample setup: > * a topic with one partition and RF=3 > * a producer using acks=1 > * min.insync.replicas to 1 > * 3 brokers 1,2,3 > * Preferred leader of the partition is brokerId 0 > > Steps to reproduce the issue > * Producer keeps producing to the partition, leader is brokerId=0 > * At some point, replicas 1 and 2 are falling behind and removed from the ISR > * The leader broker 0 has an hardware failure > * Partition transition to offline > * This leader is replaced with a new broker with an empty disk and the same > broker id 0 > * Partition transition from offline to online with leader 0, ISR = 0 > * Followers see the leader offset is 0 and decide to truncate their > partitions to 0, ISR=0,1,2 > * At this point all the topic data has been removed from all replicas and > partition size drops to 0 on all replicas > Attached some of the relevant logs. I can provide more logs if necessary -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dengziming commented on a diff in pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…
dengziming commented on code in PR #12104: URL: https://github.com/apache/kafka/pull/12104#discussion_r865512959 ## core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala: ## @@ -733,12 +733,18 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi killBroker(0) val aliveServers = brokers.filterNot(_.config.brokerId == 0) TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0) - val output = TestUtils.grabConsoleOutput( -topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions" + var output = "" + TestUtils.waitUntilTrue( +() => { + output = TestUtils.grabConsoleOutput( +topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions" Review Comment: Sorry, it seems we won't remove a broker from isr if it's the only replica so this change doesn't make sense, to make it work as expected we should change it like this: ``` && broker.metadataCache.getPartitionInfo(offlineTopic, 0).get.leader() == MetadataResponse.NO_LEADER_ID ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]
cmccabe commented on code in PR #11969: URL: https://github.com/apache/kafka/pull/11969#discussion_r865499094 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend sensor } } + + /** + * Close `channel` and decrement the connection count. + */ + def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = { +if (channel != null) { + debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress}") + dec(listenerName, channel.socket.getInetAddress) + closeSocket(channel, this) Review Comment: Well, the ConnectionQuotas class is tracking how many connections exist. It needs to be informed when a connection is closed. If you want, I can have the function take a Logging parameter so that we can log until `SocketServer` (or whatever) rather than `SocketServer.ConnectionQuotas`. But this is a DEBUG level message anyway... realistically, you don't ever see this in prod, so... not sure if it matters. ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend sensor } } + + /** + * Close `channel` and decrement the connection count. + */ + def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = { +if (channel != null) { + debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress}") + dec(listenerName, channel.socket.getInetAddress) + closeSocket(channel, this) Review Comment: Well, the ConnectionQuotas class is tracking how many connections exist. It needs to be informed when a connection is closed. If you want, I can have the function take a Logging parameter so that we can log under `SocketServer` (or whatever) rather than `SocketServer.ConnectionQuotas`. But this is a DEBUG level message anyway... realistically, you don't ever see this in prod, so... not sure if it matters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]
cmccabe commented on code in PR #11969: URL: https://github.com/apache/kafka/pull/11969#discussion_r865496509 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig, private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0) val connectionQuotas = new ConnectionQuotas(config, time, metrics) - private var startedProcessingRequests = false - private var stoppedProcessingRequests = false - // Processors are now created by each Acceptor. However to preserve compatibility, we need to number the processors - // globally, so we keep the nextProcessorId counter in SocketServer - def nextProcessorId(): Int = { -nextProcessorId.getAndIncrement() - } + /** + * A future which is completed once all the authorizer futures are complete. + */ + private val allAuthorizerFuturesComplete = new CompletableFuture[Void] /** - * Starts the socket server and creates all the Acceptors and the Processors. The Acceptors - * start listening at this stage so that the bound port is known when this method completes - * even when ephemeral ports are used. Acceptors and Processors are started if `startProcessingRequests` - * is true. If not, acceptors and processors are only started when [[kafka.network.SocketServer#startProcessingRequests()]] - * is invoked. Delayed starting of acceptors and processors is used to delay processing client - * connections until server is fully initialized, e.g. to ensure that all credentials have been - * loaded before authentications are performed. Incoming connections on this server are processed - * when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]]. - * - * @param startProcessingRequests Flag indicating whether `Processor`s must be started. - * @param controlPlaneListenerThe control plane listener, or None if there is none. - * @param dataPlaneListeners The data plane listeners. + * True if the SocketServer is stopped. Must be accessed under the SocketServer lock. */ - def startup(startProcessingRequests: Boolean = true, - controlPlaneListener: Option[EndPoint] = config.controlPlaneListener, - dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = { -this.synchronized { - createControlPlaneAcceptorAndProcessor(controlPlaneListener) - createDataPlaneAcceptorsAndProcessors(dataPlaneListeners) - if (startProcessingRequests) { -this.startProcessingRequests() - } -} + private var stopped = false + // Socket server metrics + newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors) -val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => a.processors(0)) - newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { - val ioWaitRatioMetricNames = dataPlaneProcessors.map { p => -metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) - } +val ioWaitRatioMetricNames = dataPlaneProcessors.map { p => + metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) +} +if (dataPlaneProcessors.isEmpty) { + 1.0 +} else { ioWaitRatioMetricNames.map { metricName => Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) }.sum / dataPlaneProcessors.size -}) - newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { - val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p => -metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) - } - ioWaitRatioMetricName.map { metricName => -Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) - }.getOrElse(Double.NaN) -}) -newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory) -newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory) - newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { - val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p => -metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags) - } - expiredConnectionsKilledCountMetricNames.map { metricName => -Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double]) - }.sum -}) - newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { - val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map
[jira] [Created] (KAFKA-13874) Avoid synchronization in SocketServer metrics
Colin McCabe created KAFKA-13874: Summary: Avoid synchronization in SocketServer metrics Key: KAFKA-13874 URL: https://issues.apache.org/jira/browse/KAFKA-13874 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe For performance reasons, we should avoid synchronization in SocketServer metrics like NetworkProcessorAvgIdlePercent -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]
cmccabe commented on code in PR #11969: URL: https://github.com/apache/kafka/pull/11969#discussion_r865495994 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig, private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0) val connectionQuotas = new ConnectionQuotas(config, time, metrics) - private var startedProcessingRequests = false - private var stoppedProcessingRequests = false - // Processors are now created by each Acceptor. However to preserve compatibility, we need to number the processors - // globally, so we keep the nextProcessorId counter in SocketServer - def nextProcessorId(): Int = { -nextProcessorId.getAndIncrement() - } + /** + * A future which is completed once all the authorizer futures are complete. + */ + private val allAuthorizerFuturesComplete = new CompletableFuture[Void] /** - * Starts the socket server and creates all the Acceptors and the Processors. The Acceptors - * start listening at this stage so that the bound port is known when this method completes - * even when ephemeral ports are used. Acceptors and Processors are started if `startProcessingRequests` - * is true. If not, acceptors and processors are only started when [[kafka.network.SocketServer#startProcessingRequests()]] - * is invoked. Delayed starting of acceptors and processors is used to delay processing client - * connections until server is fully initialized, e.g. to ensure that all credentials have been - * loaded before authentications are performed. Incoming connections on this server are processed - * when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]]. - * - * @param startProcessingRequests Flag indicating whether `Processor`s must be started. - * @param controlPlaneListenerThe control plane listener, or None if there is none. - * @param dataPlaneListeners The data plane listeners. + * True if the SocketServer is stopped. Must be accessed under the SocketServer lock. */ - def startup(startProcessingRequests: Boolean = true, - controlPlaneListener: Option[EndPoint] = config.controlPlaneListener, - dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = { -this.synchronized { - createControlPlaneAcceptorAndProcessor(controlPlaneListener) - createDataPlaneAcceptorsAndProcessors(dataPlaneListeners) - if (startProcessingRequests) { -this.startProcessingRequests() - } -} + private var stopped = false + // Socket server metrics + newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { Review Comment: Yeah, it would be nice to avoid the synchronization. Filed KAFKA-13874 for this. Dynamic listeners really make this kind of thing hard, even though very few people use them. :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]
cmccabe commented on code in PR #11969: URL: https://github.com/apache/kafka/pull/11969#discussion_r865495566 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -906,12 +906,35 @@ private void appendRaftEvent(String name, Runnable runnable) { if (this != metaLogListener) { log.debug("Ignoring {} raft event from an old registration", name); } else { -runnable.run(); +try { +runnable.run(); +} finally { +maybeCompleteAuthorizerInitialLoad(); +} } }); } } +private void maybeCompleteAuthorizerInitialLoad() { +if (!needToCompleteAuthorizerLoad) return; +OptionalLong highWatermark = raftClient.highWatermark(); +if (highWatermark.isPresent()) { +if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) { Review Comment: yeah, I think this is OK for now. if we have issues we can revisit... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]
hachikuji commented on code in PR #11969: URL: https://github.com/apache/kafka/pull/11969#discussion_r865430367 ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend sensor } } + + /** + * Close `channel` and decrement the connection count. + */ + def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = { +if (channel != null) { + debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress}") + dec(listenerName, channel.socket.getInetAddress) + closeSocket(channel, this) Review Comment: It's surprising to find this in `ListenerConnectionQuota`, also that we end up with a different logger. Is there any way we can pull it back to where it was? For example, maybe we could generalize `closeSocket`: ```scala def closeSocket( listenerName: ListenerName, channel: SocketChannel, connectionQuota: ConnectionQuotas, logging: Logging ) ``` Or maybe we can stick it into a trait so that we can get rid of the `logging` parameter. ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -681,24 +580,27 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private val blockedPercentMeter = newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS) private var currentProcessorIndex = 0 private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() + private var started = false + private[network] val startFuture = new CompletableFuture[Void]() - private[network] case class DelayedCloseSocket(socket: SocketChannel, endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] { -override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs compare that.endThrottleTimeMs - } + val thread = KafkaThread.nonDaemon( + s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}", +this) - private[network] def startProcessors(): Unit = synchronized { -if (!processorsStarted.getAndSet(true)) { - startProcessors(processors) + startFuture.thenRun(() => synchronized { +if (!shouldRun.get()) { + debug(s"Ignoring start future for ${endPoint.listenerName} since it has already been shut down.") Review Comment: nit: "it" -> "the acceptor"? ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -906,12 +906,35 @@ private void appendRaftEvent(String name, Runnable runnable) { if (this != metaLogListener) { log.debug("Ignoring {} raft event from an old registration", name); } else { -runnable.run(); +try { +runnable.run(); +} finally { +maybeCompleteAuthorizerInitialLoad(); +} } }); } } +private void maybeCompleteAuthorizerInitialLoad() { +if (!needToCompleteAuthorizerLoad) return; +OptionalLong highWatermark = raftClient.highWatermark(); +if (highWatermark.isPresent()) { +if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) { Review Comment: I guess the only issue with this is that the high watermark is a moving target. It probably works ok since writes to the metadata log should be infrequent. Not sure I have any better ideas. Maybe we could refresh the high watermark value only once every second or something like that. ## core/src/main/scala/kafka/network/SocketServer.scala: ## @@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig, private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0) val connectionQuotas = new ConnectionQuotas(config, time, metrics) - private var startedProcessingRequests = false - private var stoppedProcessingRequests = false - // Processors are now created by each Acceptor. However to preserve compatibility, we need to number the processors - // globally, so we keep the nextProcessorId counter in SocketServer - def nextProcessorId(): Int = { -nextProcessorId.getAndIncrement() - } + /** + * A future which is completed once all the authorizer futures are complete. + */ + private val allAuthorizerFuturesComplete = new CompletableFuture[Void] /** - * Starts the socket server and creates all the Acceptors and the Processors. The Acceptors - * start listening at this stage so that the bound port is known when this method completes - * even when ephemeral ports are used. Acceptors and Processors are started if `startProcessingRequests` - * is true. If not, acceptors and processors are only started when
[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies
[ https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13873: Description: In order to reduce resources used or modify data pipelines, users may want to pause processing temporarily. Presently, this would require stopping the entire KafkaStreams instance (or instances). This work would add the ability to pause and resume topologies. When the need to pause processing has passed, then users should be able to resume processing. KIP-834: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832] was: In order to reduce resources used or modify data pipelines, users may want to pause processing temporarily. Presently, this would require stopping the entire KafkaStreams instance (or instances). This work would add the ability to pause and resume topologies. When the need to pause processing has passed, then users should be able to resume processing. > Add ability to Pause / Resume KafkaStreams Topologies > - > > Key: KAFKA-13873 > URL: https://issues.apache.org/jira/browse/KAFKA-13873 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Jim Hughes >Assignee: Jim Hughes >Priority: Major > Labels: kip > > In order to reduce resources used or modify data pipelines, users may want to > pause processing temporarily. Presently, this would require stopping the > entire KafkaStreams instance (or instances). > This work would add the ability to pause and resume topologies. When the > need to pause processing has passed, then users should be able to resume > processing. > KIP-834: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies
[ https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13873: Component/s: streams > Add ability to Pause / Resume KafkaStreams Topologies > - > > Key: KAFKA-13873 > URL: https://issues.apache.org/jira/browse/KAFKA-13873 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Jim Hughes >Assignee: Jim Hughes >Priority: Major > > In order to reduce resources used or modify data pipelines, users may want to > pause processing temporarily. Presently, this would require stopping the > entire KafkaStreams instance (or instances). > This work would add the ability to pause and resume topologies. When the > need to pause processing has passed, then users should be able to resume > processing. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies
[ https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13873: Labels: kip (was: ) > Add ability to Pause / Resume KafkaStreams Topologies > - > > Key: KAFKA-13873 > URL: https://issues.apache.org/jira/browse/KAFKA-13873 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Jim Hughes >Assignee: Jim Hughes >Priority: Major > Labels: kip > > In order to reduce resources used or modify data pipelines, users may want to > pause processing temporarily. Presently, this would require stopping the > entire KafkaStreams instance (or instances). > This work would add the ability to pause and resume topologies. When the > need to pause processing has passed, then users should be able to resume > processing. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] artemlivshits commented on pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg
artemlivshits commented on PR #12049: URL: https://github.com/apache/kafka/pull/12049#issuecomment-1117971682 Hmm, for some reason the update didn't trigger any build & test jobs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg
artemlivshits commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r865302205 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -960,8 +1002,10 @@ private Future doSend(ProducerRecord record, Callback call " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } + +// Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION, +// which means that the RecordAccumulator would pick a partition based on broker load. Review Comment: Missed, now rephrased. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java: ## @@ -25,12 +23,15 @@ private final Cluster cluster; private final Serializer keySerializer; -private final DefaultPartitioner defaultPartitioner; +@SuppressWarnings("deprecation") +private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner; + +@SuppressWarnings("deprecation") public DefaultStreamPartitioner(final Serializer keySerializer, final Cluster cluster) { this.cluster = cluster; this.keySerializer = keySerializer; -this.defaultPartitioner = new DefaultPartitioner(); +this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner(); Review Comment: DefaultPartitioner implements onNewBatch, but DefaultStreamPartitioner doesn't seem to ever call it (the DefaultPartitioner is a private object). Without onNewBatch, the DefaultPartitioner.partition would return the same partition for unkeyed messages. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java: ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** + * Built-in default partitioner. Note, that this is just a utility class that is used directly from + * RecordAccumulator, it does not implement the Partitioner interface. + * + * The class keeps track of various bookkeeping information required for adaptive sticky partitioning + * (described in detail in KIP-794). There is one partitioner object per topic. + */ +public class BuiltInPartitioner { +private final Logger log; +private final String topic; +private final int stickyBatchSize; + +private volatile PartitionLoadStats partitionLoadStats = null; +private final AtomicReference stickyPartitionInfo = new AtomicReference<>(); + +// Visible and used for testing only. +static volatile public Supplier mockRandom = null; + +/** + * BuiltInPartitioner constructor. + * + * @param topic The topic + * @param stickyBatchSize How much to produce to partition before switch + */ +public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) { +this.log = logContext.logger(BuiltInPartitioner.class); +this.topic = topic; +this.stickyBatchSize = stickyBatchSize; +} + +/** + * Calculate the next partition for the topic based on the partition load stats. + */ +private int nextPartition(Cluster cluster) { +int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt()); + +// Cache volatile variable in local variable. +PartitionLoadStats partitionLoadStats = this.partitionLoadStats; + +if (partitionLoadStats == null) { +// We don't have stats to do
[GitHub] [kafka] hachikuji commented on a diff in pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode
hachikuji commented on code in PR #12108: URL: https://github.com/apache/kafka/pull/12108#discussion_r865377321 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -215,13 +215,17 @@ private void incrementalAlterConfigResource(ConfigResource configResource, } List newValueParts = getParts(newValue, key, configResource); if (opType == APPEND) { -if (!newValueParts.contains(opValue)) { -newValueParts.add(opValue); +for (String value: opValue.split(",")) { +if (!newValueParts.contains(value)) { Review Comment: Do we have an integration test which covers the case when the appended value is already present? It kind of looks like the zk path doesn't handle that. By the way, I found the naming a little confusing here. I think `newValueParts` should be `currentValueParts` or something like that. ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -1751,13 +1777,32 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete), AlterConfigOp.OpType.SUBTRACT) ).asJavaCollection -alterResult = client.incrementalAlterConfigs(Map( +alterConfigs = Map( topic1Resource -> topic1AlterConfigs, topic2Resource -> topic2AlterConfigs -).asJava) +) +alterResult = client.incrementalAlterConfigs(alterConfigs.asJava) assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) alterResult.all.get +if (isKRaftTest()) { Review Comment: I agree having some utilities would be nice. Rather than having something really specific, maybe we just need a utility which waits until the brokers have caught up to the controller end offset? ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -215,13 +215,17 @@ private void incrementalAlterConfigResource(ConfigResource configResource, } List newValueParts = getParts(newValue, key, configResource); if (opType == APPEND) { -if (!newValueParts.contains(opValue)) { -newValueParts.add(opValue); +for (String value: opValue.split(",")) { Review Comment: nit: conventionally we put a space before the colon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ahuang98 opened a new pull request, #12123: KAFKA-13830 Introduce metadata.version for KRaft
ahuang98 opened a new pull request, #12123: URL: https://github.com/apache/kafka/pull/12123 From https://github.com/apache/kafka/pull/12050: > This patch includes a new metadata.version which is planned to replace IBP in KRaft clusters as specified in KIP-778. The kafka-storage tool now allows a user to specify a specific metadata.version to bootstrap into the cluster, otherwise the latest version is used. Upon the first leader election of the KRaft quroum, this initial metadata.version is written into the metadata log. When writing snapshots, a FeatureLevelRecord for metadata.version will be written out ahead of other records so we can decode things at the correct version level. This also includes additional validation in the controller when setting feature levels. It will now check that a given metadata.version is supportable by the quroum, not just the brokers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13815) Avoid reinitialization for a replica that is being deleted
[ https://issues.apache.org/jira/browse/KAFKA-13815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-13815. - Fix Version/s: 3.3.0 Resolution: Fixed merged the PR to trunk > Avoid reinitialization for a replica that is being deleted > -- > > Key: KAFKA-13815 > URL: https://issues.apache.org/jira/browse/KAFKA-13815 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > Fix For: 3.3.0 > > > https://issues.apache.org/jira/browse/KAFKA-10002 > identified that deletion of replicas can be slow when a StopReplica request > is being > processed, and has implemented a change to improve the efficiency. > We found that the efficiency can be further improved by avoiding the > reinitialization of the > leader epoch cache and partition metadata for a replica that needs to be > deleted. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] junrao merged pull request #12029: KAFKA-13815: Avoid reinitialization for a replica that is being deleted
junrao merged PR #12029: URL: https://github.com/apache/kafka/pull/12029 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on a diff in pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode
akhileshchg commented on code in PR #12108: URL: https://github.com/apache/kafka/pull/12108#discussion_r865146433 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -1751,13 +1777,32 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete), AlterConfigOp.OpType.SUBTRACT) ).asJavaCollection -alterResult = client.incrementalAlterConfigs(Map( +alterConfigs = Map( topic1Resource -> topic1AlterConfigs, topic2Resource -> topic2AlterConfigs -).asJava) +) +alterResult = client.incrementalAlterConfigs(alterConfigs.asJava) assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) alterResult.all.get +if (isKRaftTest()) { Review Comment: I think this can be a util function: ``` TestUtils.waitForMetadataPropagation(brokers, condition: KRaftMetadataCache => Boolean): Unit = { assertTrue(isKRaftTest()) TestUtils.waitUntilTrue(() => { brokers.forall { broker => condition(broker.metadataCache.asInstanceOf[KRaftMetadataCache]) }, "Metadata is not propgated to all brokers") } ``` and we can call it as: ``` TestUtils.waitForMetadataPropagation(brokers, metadataCache => { // validation logic. }) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Gerrrr opened a new pull request, #12122: WIP: Upgrade tests for KAFKA-13769
Ge opened a new pull request, #12122: URL: https://github.com/apache/kafka/pull/12122 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13863) Prevent null config value when create topic in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17531880#comment-17531880 ] Colin McCabe commented on KAFKA-13863: -- We already validate that topic configurations can't be null in `ControllerConfigurationValidator`. {code} override def validate( resource: ConfigResource, config: util.Map[String, String] ): Unit = { resource.`type`() match { case TOPIC => validateTopicName(resource.name()) val properties = new Properties() val nullTopicConfigs = new mutable.ArrayBuffer[String]() config.entrySet().forEach(e => { if (e.getValue() == null) { nullTopicConfigs += e.getKey() } else { properties.setProperty(e.getKey(), e.getValue()) } }) if (nullTopicConfigs.nonEmpty) { throw new InvalidRequestException("Null value not supported for topic configs : " + nullTopicConfigs.mkString(",")) } {code} I don't mind adding extra test coverage but there should be no need to change `ReplicationControlManager`, as far as I can see. > Prevent null config value when create topic in KRaft mode > - > > Key: KAFKA-13863 > URL: https://issues.apache.org/jira/browse/KAFKA-13863 > Project: Kafka > Issue Type: Bug >Reporter: dengziming >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] cmccabe commented on pull request #12109: KAFKA-13863: Prevent null config value when create topic in KRaft mode
cmccabe commented on PR #12109: URL: https://github.com/apache/kafka/pull/12109#issuecomment-1117657256 We already validate that topic configurations can't be null in `ControllerConfigurationValidator`. ``` override def validate( resource: ConfigResource, config: util.Map[String, String] ): Unit = { resource.`type`() match { case TOPIC => validateTopicName(resource.name()) val properties = new Properties() val nullTopicConfigs = new mutable.ArrayBuffer[String]() config.entrySet().forEach(e => { if (e.getValue() == null) { nullTopicConfigs += e.getKey() } else { properties.setProperty(e.getKey(), e.getValue()) } }) if (nullTopicConfigs.nonEmpty) { throw new InvalidRequestException("Null value not supported for topic configs : " + nullTopicConfigs.mkString(",")) } ``` I don't mind adding extra test coverage but there should be no need to change `ReplicationControlManager`, as far as I can see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13861) validateOnly request field does not work for CreatePartition requests in Kraft mode.
[ https://issues.apache.org/jira/browse/KAFKA-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13861: Fix Version/s: 3.3.0 > validateOnly request field does not work for CreatePartition requests in > Kraft mode. > > > Key: KAFKA-13861 > URL: https://issues.apache.org/jira/browse/KAFKA-13861 > Project: Kafka > Issue Type: Bug >Reporter: Akhilesh Chaganti >Assignee: Akhilesh Chaganti >Priority: Major > Fix For: 3.3.0 > > > `ControllerApis` ignores the validateOnly field and the `QuorumController` > does not have any logic to handle the `validateOnly` requests for > `CreatePartitions. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13861) validateOnly request field does not work for CreatePartition requests in Kraft mode.
[ https://issues.apache.org/jira/browse/KAFKA-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Akhilesh Chaganti resolved KAFKA-13861. --- Resolution: Fixed > validateOnly request field does not work for CreatePartition requests in > Kraft mode. > > > Key: KAFKA-13861 > URL: https://issues.apache.org/jira/browse/KAFKA-13861 > Project: Kafka > Issue Type: Bug >Reporter: Akhilesh Chaganti >Assignee: Akhilesh Chaganti >Priority: Major > > `ControllerApis` ignores the validateOnly field and the `QuorumController` > does not have any logic to handle the `validateOnly` requests for > `CreatePartitions. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] guozhangwang commented on pull request #12041: MINOR: ignore unused configuration when ConsumerCoordinator is not constructed
guozhangwang commented on PR #12041: URL: https://github.com/apache/kafka/pull/12041#issuecomment-1117635812 Thanks @C0urante for your thoughts. I'd like to clarify one thing that, today users can pass in both defined and unknown config values, where the latter may be used in some plugin modules (e.g. Kafka Streams's partition assignor). Since the `config.logUnused()` is called at the end of the the client constructor, at that time the latter category of configs may not be retrieved yet, and that does not mean that they will never be retrieved later after the constructor. So the logging message: "... were supplied but are not used yet." is reasonable, as "by that time" we do not know if they are never used or not, and we cannot just call `ignore` on all these configs in order to not log them. Now for the former case, generally we expect that by the time `config.logUnused()` is called, all defined configs should be retrieved. If the client does not retrieve them yet, AND users have specified values for those configs, it's debatable that we should let users know as a reminder that they can consider removing those overrides; whereas for those configs which are not overridden by users, we would not bother to let them know at all. If we want to do that, I'd suggest we do it universally: i.e. for all cases, including the previous `ignored` cases like `KEY_DESERIALIZER_CLASS_CONFIG`. Maybe you can send out a discussion email in the community to ask for a consensus? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies
[ https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jim Hughes reassigned KAFKA-13873: -- Assignee: Jim Hughes > Add ability to Pause / Resume KafkaStreams Topologies > - > > Key: KAFKA-13873 > URL: https://issues.apache.org/jira/browse/KAFKA-13873 > Project: Kafka > Issue Type: New Feature >Reporter: Jim Hughes >Assignee: Jim Hughes >Priority: Major > > In order to reduce resources used or modify data pipelines, users may want to > pause processing temporarily. Presently, this would require stopping the > entire KafkaStreams instance (or instances). > This work would add the ability to pause and resume topologies. When the > need to pause processing has passed, then users should be able to resume > processing. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies
Jim Hughes created KAFKA-13873: -- Summary: Add ability to Pause / Resume KafkaStreams Topologies Key: KAFKA-13873 URL: https://issues.apache.org/jira/browse/KAFKA-13873 Project: Kafka Issue Type: New Feature Reporter: Jim Hughes In order to reduce resources used or modify data pipelines, users may want to pause processing temporarily. Presently, this would require stopping the entire KafkaStreams instance (or instances). This work would add the ability to pause and resume topologies. When the need to pause processing has passed, then users should be able to resume processing. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] vamossagar12 opened a new pull request, #12121: Kafka 13846: Adding overloaded metricOrElseCreate method
vamossagar12 opened a new pull request, #12121: URL: https://github.com/apache/kafka/pull/12121 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode
hachikuji merged PR #12106: URL: https://github.com/apache/kafka/pull/12106 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11874: Fix typos in configuration docs
guozhangwang merged PR #11874: URL: https://github.com/apache/kafka/pull/11874 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 commented on PR #11211: URL: https://github.com/apache/kafka/pull/11211#issuecomment-1117583307 > LGTM! Thanks for the PR and, the patience! @ableegoldman @guozhangwang @mjsax , do you want to have another look for this PR? hi.. whenever you get the chance, could you plz review the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13870) support both Suppressed untilTimeLimit and maxBytes without using emitEarlyWhenFull()
[ https://issues.apache.org/jira/browse/KAFKA-13870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13870: Labels: needs-kip (was: ) > support both Suppressed untilTimeLimit and maxBytes without using > emitEarlyWhenFull() > - > > Key: KAFKA-13870 > URL: https://issues.apache.org/jira/browse/KAFKA-13870 > Project: Kafka > Issue Type: New Feature >Reporter: Anil >Priority: Major > Labels: needs-kip > > My use case is to use ** *untilTimeLimit* with *maxBytes,* but when the > buffer is full, the application is breaking, but with using > *{{emitEarlyWhenFull}}* {{{}application is not breaking but{}}}{*}{{}}{*} it > sends out the same key record multiple times in a particular window when the > buffer exceeds max bytes > for eg:- > *Suppressed.untilTimeLimit(Duration.ofMinutes(15),Suppressed.BufferConfig.maxBytes(1).emitEarlyWhenFull())* > > messages flow : (A,1) (A,2) (A,3) -> aggregation result : (A,6) . suppose > here, the buffer is full, (A,6) will be sent downstream. Let's suppose (A,4) > comes now in the same tumbling window. > > current response:- the aggregation will continue and eventually *(A,10)* will > be emitted > > but our application expected *(A,4),* so the request for the feature is that > window should be happening with window time(untilTimeLimit) or > Buffer(maxByte) should full, in either of these two conditions met, a new > window should be created and data should be emitted > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13870) support both Suppressed untilTimeLimit and maxBytes without using emitEarlyWhenFull()
[ https://issues.apache.org/jira/browse/KAFKA-13870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17531836#comment-17531836 ] Matthias J. Sax commented on KAFKA-13870: - I think you mix up two concepts: windowing is about "grouping" records according to their timestamp. Thus, the _window-size_ you define via `TimeWindows.withSizeAndGrace()` (or similar) defines into which window a record falls into. On the other hand suppression has nothing to do with the definition of window bounds, but it's about when to emit (potentially partial) results for a window – however, if a partial result is emitted, the window is not closed. It seems your request is more about a window definition rather than suppress, and you want some kind of "count based" window? It might be possible to add, but only if your requirement are clear. Note that the suppress buffer size has nothing to do with the window definition. The buffer size basically define how many windows the buffer can hold before it need to emit a partial result and drop a window from the buffer. Thus, it's not clear to me what semantics you really need? For now, you could still build it manually using the Processor API. > support both Suppressed untilTimeLimit and maxBytes without using > emitEarlyWhenFull() > - > > Key: KAFKA-13870 > URL: https://issues.apache.org/jira/browse/KAFKA-13870 > Project: Kafka > Issue Type: New Feature >Reporter: Anil >Priority: Major > > My use case is to use ** *untilTimeLimit* with *maxBytes,* but when the > buffer is full, the application is breaking, but with using > *{{emitEarlyWhenFull}}* {{{}application is not breaking but{}}}{*}{{}}{*} it > sends out the same key record multiple times in a particular window when the > buffer exceeds max bytes > for eg:- > *Suppressed.untilTimeLimit(Duration.ofMinutes(15),Suppressed.BufferConfig.maxBytes(1).emitEarlyWhenFull())* > > messages flow : (A,1) (A,2) (A,3) -> aggregation result : (A,6) . suppose > here, the buffer is full, (A,6) will be sent downstream. Let's suppose (A,4) > comes now in the same tumbling window. > > current response:- the aggregation will continue and eventually *(A,10)* will > be emitted > > but our application expected *(A,4),* so the request for the feature is that > window should be happening with window time(untilTimeLimit) or > Buffer(maxByte) should full, in either of these two conditions met, a new > window should be created and data should be emitted > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…
vamossagar12 commented on code in PR #12104: URL: https://github.com/apache/kafka/pull/12104#discussion_r865051847 ## core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala: ## @@ -733,12 +733,18 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi killBroker(0) val aliveServers = brokers.filterNot(_.config.brokerId == 0) TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0) - val output = TestUtils.grabConsoleOutput( -topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions" + var output = "" + TestUtils.waitUntilTrue( +() => { + output = TestUtils.grabConsoleOutput( +topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions" Review Comment: yeah.. i added it back @dengziming -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13872) Partitions are truncated when leader is replaced
[ https://issues.apache.org/jira/browse/KAFKA-13872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Francois Visconte updated KAFKA-13872: -- Description: Sample setup: * a topic with one partition and RF=3 * a producer using acks=1 * min.insync.replicas to 1 * 3 brokers 1,2,3 * Preferred leader of the partition is brokerId 0 Steps to reproduce the issue * Producer keeps producing to the partition, leader is brokerId=0 * At some point, replicas 1 and 2 are falling behind and removed from the ISR * The leader broker 0 has an hardware failure * Partition transition to offline * This leader is replaced with a new broker with an empty disk and the same broker id 0 * Partition transition from offline to online with leader 0, ISR = 0 * Followers see the leader offset is 0 and decide to truncate their partitions to 0, ISR=0,1,2 * At this point all the topic data has been removed from all replicas and partition size drops to 0 on all replicas Attached some of the relevant logs. I can provide more logs if necessary was: Sample setup: * a topic with one partition and RF=3 * a producer using acks=1 * min.insync.replicas to 1 * 3 brokers 1,2,3 * Preferred leader of the partition is brokerId 0 Steps to reproduce the issue * Producer keeps producing to the partition, leader is brokerId=0 * At some point, replicas 1 and 2 are falling behind and removed from the ISR * The leader broker 0 has an hardware failure * Partition transition to offline * This leader is replaced with a new broker with an empty disk and the same broker id 0 * Partition transition from offline to online with leader 0, ISR = 0 * Followers see the leader offset is 0 and decide to truncate their partitions to 0, ISR=0,1,2 Attached some of the relevant logs. I can provide more logs if necessary > Partitions are truncated when leader is replaced > > > Key: KAFKA-13872 > URL: https://issues.apache.org/jira/browse/KAFKA-13872 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.2 >Reporter: Francois Visconte >Priority: Major > Attachments: extract-2022-05-04T15_50_34.110Z.csv > > > Sample setup: > * a topic with one partition and RF=3 > * a producer using acks=1 > * min.insync.replicas to 1 > * 3 brokers 1,2,3 > * Preferred leader of the partition is brokerId 0 > > Steps to reproduce the issue > * Producer keeps producing to the partition, leader is brokerId=0 > * At some point, replicas 1 and 2 are falling behind and removed from the ISR > * The leader broker 0 has an hardware failure > * Partition transition to offline > * This leader is replaced with a new broker with an empty disk and the same > broker id 0 > * Partition transition from offline to online with leader 0, ISR = 0 > * Followers see the leader offset is 0 and decide to truncate their > partitions to 0, ISR=0,1,2 > * At this point all the topic data has been removed from all replicas and > partition size drops to 0 on all replicas > Attached some of the relevant logs. I can provide more logs if necessary -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13872) Partitions are truncated when leader is replaced
Francois Visconte created KAFKA-13872: - Summary: Partitions are truncated when leader is replaced Key: KAFKA-13872 URL: https://issues.apache.org/jira/browse/KAFKA-13872 Project: Kafka Issue Type: Bug Affects Versions: 2.7.2 Reporter: Francois Visconte Attachments: extract-2022-05-04T15_50_34.110Z.csv Sample setup: * a topic with one partition and RF=3 * a producer using acks=1 * min.insync.replicas to 1 * 3 brokers 1,2,3 * Preferred leader of the partition is brokerId 0 Steps to reproduce the issue * Producer keeps producing to the partition, leader is brokerId=0 * At some point, replicas 1 and 2 are falling behind and removed from the ISR * The leader broker 0 has an hardware failure * Partition transition to offline * This leader is replaced with a new broker with an empty disk and the same broker id 0 * Partition transition from offline to online with leader 0, ISR = 0 * Followers see the leader offset is 0 and decide to truncate their partitions to 0, ISR=0,1,2 Attached some of the relevant logs. I can provide more logs if necessary -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13871) The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the RaftConfig class is incorrect
yingquan he created KAFKA-13871: --- Summary: The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the RaftConfig class is incorrect Key: KAFKA-13871 URL: https://issues.apache.org/jira/browse/KAFKA-13871 Project: Kafka Issue Type: Improvement Components: kraft Reporter: yingquan he Assignee: yingquan he The syntax of the field QUORUM_FETCH_TIMEOUT_MS_DOC is incorrect. `a election` should be changed to `an election`. {code:java} public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time without a successful fetch from " + "the current leader before becoming a candidate and triggering a election for voters; Maximum time without " + "receiving fetch from a majority of the quorum before asking around to see if there's a new epoch for leader"; {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)