[jira] [Updated] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output

2022-05-04 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13875:
--
Description: 
The topic describe output in quickstart doc here: 
[https://kafka.apache.org/quickstart] should be updated now.
{code:java}
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 
localhost:9092
Topic:quickstart-events  PartitionCount:1ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0Leader: 0   Replicas: 0 Isr: 
0{code}
After Topic Id implementation, we included the topic id info in the output now. 
Also the configs is not empty now. The doc should be updated to avoid new users 
get confused.

  was:
The topic describe output in quickstart doc here: 
[https://kafka.apache.org/quickstart] should be updated now.
{code:java}
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 
localhost:9092
Topic:quickstart-events  PartitionCount:1ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0Leader: 0   Replicas: 0 Isr: 
0{code}
After Topic Id implementation, we included the topic id info in the output now.


> update docs to include topoicId for kafka-topics.sh --describe output
> -
>
> Key: KAFKA-13875
> URL: https://issues.apache.org/jira/browse/KAFKA-13875
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Priority: Major
>  Labels: newbie
>
> The topic describe output in quickstart doc here: 
> [https://kafka.apache.org/quickstart] should be updated now.
> {code:java}
> bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 
> localhost:9092
> Topic:quickstart-events  PartitionCount:1ReplicationFactor:1 Configs:
> Topic: quickstart-events Partition: 0Leader: 0   Replicas: 0 Isr: 
> 0{code}
> After Topic Id implementation, we included the topic id info in the output 
> now. Also the configs is not empty now. The doc should be updated to avoid 
> new users get confused.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output

2022-05-04 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13875:
-

 Summary: update docs to include topoicId for kafka-topics.sh 
--describe output
 Key: KAFKA-13875
 URL: https://issues.apache.org/jira/browse/KAFKA-13875
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Affects Versions: 3.2.0
Reporter: Luke Chen


The topic describe output in quickstart doc here: 
[https://kafka.apache.org/quickstart] should be updated now.
{code:java}
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 
localhost:9092
Topic:quickstart-events  PartitionCount:1ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0Leader: 0   Replicas: 0 Isr: 
0{code}
After Topic Id implementation, we included the topic id info in the output now.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output

2022-05-04 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13875:
--
Labels: newbie  (was: )

> update docs to include topoicId for kafka-topics.sh --describe output
> -
>
> Key: KAFKA-13875
> URL: https://issues.apache.org/jira/browse/KAFKA-13875
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Priority: Major
>  Labels: newbie
>
> The topic describe output in quickstart doc here: 
> [https://kafka.apache.org/quickstart] should be updated now.
> {code:java}
> bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 
> localhost:9092
> Topic:quickstart-events  PartitionCount:1ReplicationFactor:1 Configs:
> Topic: quickstart-events Partition: 0Leader: 0   Replicas: 0 Isr: 
> 0{code}
> After Topic Id implementation, we included the topic id info in the output 
> now.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dengziming commented on pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions

2022-05-04 Thread GitBox


dengziming commented on PR #12112:
URL: https://github.com/apache/kafka/pull/12112#issuecomment-1118110191

   @showuon I seem to see it failed but I can't find it anymore, I will close 
this after I can't see it fail again in a few days.
   @divijvaidya I would fix your comment if this test really fails again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…

2022-05-04 Thread GitBox


dengziming commented on code in PR #12104:
URL: https://github.com/apache/kafka/pull/12104#discussion_r865512959


##
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala:
##
@@ -733,12 +733,18 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   killBroker(0)
   val aliveServers = brokers.filterNot(_.config.brokerId == 0)
   TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0)
-  val output = TestUtils.grabConsoleOutput(
-topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"
+  var output = ""
+  TestUtils.waitUntilTrue(
+() => {
+  output = TestUtils.grabConsoleOutput(
+topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"

Review Comment:
   Sorry, it seems we won't remove a broker from isr if it's the only replica 
so this change doesn't make sense, to make it work as expected we should change 
it like this:
   ```
    && broker.metadataCache.getPartitionInfo(offlineTopic, 0).get.leader() 
== MetadataResponse.NO_LEADER_ID
   ```
   
   You can make further investigation about why the isr of 
TopicPartition(offlineTopic, 0) still contains broker 0 after broker 0 is 
killed if you are interested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13872) Partitions are truncated when leader is replaced

2022-05-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532002#comment-17532002
 ] 

Luke Chen commented on KAFKA-13872:
---

[~fvisconte] , thanks for reporting the issue. But I think this is the expected 
behavior, isn't it? We always take the logs in partition leader as source of 
truth. I'd like to know what your expected behavior is in this case. Thanks.

> Partitions are truncated when leader is replaced
> 
>
> Key: KAFKA-13872
> URL: https://issues.apache.org/jira/browse/KAFKA-13872
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.2
>Reporter: Francois Visconte
>Priority: Major
> Attachments: extract-2022-05-04T15_50_34.110Z.csv
>
>
> Sample setup:
>  * a topic with one partition and RF=3
>  * a producer using acks=1
>  * min.insync.replicas to 1
>  * 3 brokers 1,2,3
>  * Preferred leader of the partition is brokerId 0
>  
> Steps to reproduce the issue
>  * Producer keeps producing to the partition, leader is brokerId=0
>  * At some point, replicas 1 and 2 are falling behind and removed from the ISR
>  * The leader broker 0 has an hardware failure
>  * Partition transition to offline
>  * This leader is replaced with a new broker with an empty disk and the same 
> broker id 0
>  * Partition transition from offline to online with leader 0, ISR = 0
>  * Followers see the leader offset is 0 and decide to truncate their 
> partitions to 0, ISR=0,1,2
>  * At this point all the topic data has been removed from all replicas and 
> partition size drops to 0 on all replicas
> Attached some of the relevant logs. I can provide more logs if necessary



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dengziming commented on a diff in pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…

2022-05-04 Thread GitBox


dengziming commented on code in PR #12104:
URL: https://github.com/apache/kafka/pull/12104#discussion_r865512959


##
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala:
##
@@ -733,12 +733,18 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   killBroker(0)
   val aliveServers = brokers.filterNot(_.config.brokerId == 0)
   TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0)
-  val output = TestUtils.grabConsoleOutput(
-topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"
+  var output = ""
+  TestUtils.waitUntilTrue(
+() => {
+  output = TestUtils.grabConsoleOutput(
+topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"

Review Comment:
   Sorry, it seems we won't remove a broker from isr if it's the only replica 
so this change doesn't make sense, to make it work as expected we should change 
it like this:
   ```
    && broker.metadataCache.getPartitionInfo(offlineTopic, 0).get.leader() 
== MetadataResponse.NO_LEADER_ID
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]

2022-05-04 Thread GitBox


cmccabe commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r865499094


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   sensor
 }
   }
+
+  /**
+   * Close `channel` and decrement the connection count.
+   */
+  def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = 
{
+if (channel != null) {
+  debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
+  dec(listenerName, channel.socket.getInetAddress)
+  closeSocket(channel, this)

Review Comment:
   Well, the ConnectionQuotas class is tracking how many connections exist. It 
needs to be informed when a connection is closed. If you want, I can have the 
function take a Logging parameter so that we can log until `SocketServer` (or 
whatever) rather than `SocketServer.ConnectionQuotas`. But this is a DEBUG 
level message anyway... realistically, you don't ever see this in prod, so... 
not sure if it matters.



##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   sensor
 }
   }
+
+  /**
+   * Close `channel` and decrement the connection count.
+   */
+  def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = 
{
+if (channel != null) {
+  debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
+  dec(listenerName, channel.socket.getInetAddress)
+  closeSocket(channel, this)

Review Comment:
   Well, the ConnectionQuotas class is tracking how many connections exist. It 
needs to be informed when a connection is closed. If you want, I can have the 
function take a Logging parameter so that we can log under `SocketServer` (or 
whatever) rather than `SocketServer.ConnectionQuotas`. But this is a DEBUG 
level message anyway... realistically, you don't ever see this in prod, so... 
not sure if it matters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]

2022-05-04 Thread GitBox


cmccabe commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r865496509


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig,
 
   private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
-  private var startedProcessingRequests = false
-  private var stoppedProcessingRequests = false
 
-  // Processors are now created by each Acceptor. However to preserve 
compatibility, we need to number the processors
-  // globally, so we keep the nextProcessorId counter in SocketServer
-  def nextProcessorId(): Int = {
-nextProcessorId.getAndIncrement()
-  }
+  /**
+   * A future which is completed once all the authorizer futures are complete.
+   */
+  private val allAuthorizerFuturesComplete = new CompletableFuture[Void]
 
   /**
-   * Starts the socket server and creates all the Acceptors and the 
Processors. The Acceptors
-   * start listening at this stage so that the bound port is known when this 
method completes
-   * even when ephemeral ports are used. Acceptors and Processors are started 
if `startProcessingRequests`
-   * is true. If not, acceptors and processors are only started when 
[[kafka.network.SocketServer#startProcessingRequests()]]
-   * is invoked. Delayed starting of acceptors and processors is used to delay 
processing client
-   * connections until server is fully initialized, e.g. to ensure that all 
credentials have been
-   * loaded before authentications are performed. Incoming connections on this 
server are processed
-   * when processors start up and invoke 
[[org.apache.kafka.common.network.Selector#poll]].
-   *
-   * @param startProcessingRequests Flag indicating whether `Processor`s must 
be started.
-   * @param controlPlaneListenerThe control plane listener, or None if 
there is none.
-   * @param dataPlaneListeners  The data plane listeners.
+   * True if the SocketServer is stopped. Must be accessed under the 
SocketServer lock.
*/
-  def startup(startProcessingRequests: Boolean = true,
-  controlPlaneListener: Option[EndPoint] = 
config.controlPlaneListener,
-  dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): 
Unit = {
-this.synchronized {
-  createControlPlaneAcceptorAndProcessor(controlPlaneListener)
-  createDataPlaneAcceptorsAndProcessors(dataPlaneListeners)
-  if (startProcessingRequests) {
-this.startProcessingRequests()
-  }
-}
+  private var stopped = false
 
+  // Socket server metrics
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
 val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
-val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => 
a.processors(0))
-
newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () 
=> SocketServer.this.synchronized {
-  val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
-metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-  }
+val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
+  metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
+}
+if (dataPlaneProcessors.isEmpty) {
+  1.0
+} else {
   ioWaitRatioMetricNames.map { metricName =>
 Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
   }.sum / dataPlaneProcessors.size
-})
-
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
-  val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
-metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-  }
-  ioWaitRatioMetricName.map { metricName =>
-Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
-  }.getOrElse(Double.NaN)
-})
-newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
-newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
-
newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () 
=> SocketServer.this.synchronized {
-  val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { 
p =>
-metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
-  }
-  expiredConnectionsKilledCountMetricNames.map { metricName =>
-Option(metrics.metric(metricName)).fold(0.0)(m => 
m.metricValue.asInstanceOf[Double])
-  }.sum
-})
-
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => SocketServer.this.synchronized {
-  val expiredConnectionsKilledCountMetricNames = 
controlPlaneProcessorOpt.map 

[jira] [Created] (KAFKA-13874) Avoid synchronization in SocketServer metrics

2022-05-04 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13874:


 Summary: Avoid synchronization in SocketServer metrics
 Key: KAFKA-13874
 URL: https://issues.apache.org/jira/browse/KAFKA-13874
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe


For performance reasons, we should avoid synchronization in SocketServer 
metrics like NetworkProcessorAvgIdlePercent



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]

2022-05-04 Thread GitBox


cmccabe commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r865495994


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig,
 
   private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
-  private var startedProcessingRequests = false
-  private var stoppedProcessingRequests = false
 
-  // Processors are now created by each Acceptor. However to preserve 
compatibility, we need to number the processors
-  // globally, so we keep the nextProcessorId counter in SocketServer
-  def nextProcessorId(): Int = {
-nextProcessorId.getAndIncrement()
-  }
+  /**
+   * A future which is completed once all the authorizer futures are complete.
+   */
+  private val allAuthorizerFuturesComplete = new CompletableFuture[Void]
 
   /**
-   * Starts the socket server and creates all the Acceptors and the 
Processors. The Acceptors
-   * start listening at this stage so that the bound port is known when this 
method completes
-   * even when ephemeral ports are used. Acceptors and Processors are started 
if `startProcessingRequests`
-   * is true. If not, acceptors and processors are only started when 
[[kafka.network.SocketServer#startProcessingRequests()]]
-   * is invoked. Delayed starting of acceptors and processors is used to delay 
processing client
-   * connections until server is fully initialized, e.g. to ensure that all 
credentials have been
-   * loaded before authentications are performed. Incoming connections on this 
server are processed
-   * when processors start up and invoke 
[[org.apache.kafka.common.network.Selector#poll]].
-   *
-   * @param startProcessingRequests Flag indicating whether `Processor`s must 
be started.
-   * @param controlPlaneListenerThe control plane listener, or None if 
there is none.
-   * @param dataPlaneListeners  The data plane listeners.
+   * True if the SocketServer is stopped. Must be accessed under the 
SocketServer lock.
*/
-  def startup(startProcessingRequests: Boolean = true,
-  controlPlaneListener: Option[EndPoint] = 
config.controlPlaneListener,
-  dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): 
Unit = {
-this.synchronized {
-  createControlPlaneAcceptorAndProcessor(controlPlaneListener)
-  createDataPlaneAcceptorsAndProcessors(dataPlaneListeners)
-  if (startProcessingRequests) {
-this.startProcessingRequests()
-  }
-}
+  private var stopped = false
 
+  // Socket server metrics
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {

Review Comment:
   Yeah, it would be nice to avoid the synchronization. Filed KAFKA-13874 for 
this.
   
   Dynamic listeners really make this kind of thing hard, even though very few 
people use them. :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]

2022-05-04 Thread GitBox


cmccabe commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r865495566


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -906,12 +906,35 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
 if (this != metaLogListener) {
 log.debug("Ignoring {} raft event from an old 
registration", name);
 } else {
-runnable.run();
+try {
+runnable.run();
+} finally {
+maybeCompleteAuthorizerInitialLoad();
+}
 }
 });
 }
 }
 
+private void maybeCompleteAuthorizerInitialLoad() {
+if (!needToCompleteAuthorizerLoad) return;
+OptionalLong highWatermark = raftClient.highWatermark();
+if (highWatermark.isPresent()) {
+if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) {

Review Comment:
   yeah, I think this is OK for now. if we have issues we can revisit...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading [WIP]

2022-05-04 Thread GitBox


hachikuji commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r865430367


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   sensor
 }
   }
+
+  /**
+   * Close `channel` and decrement the connection count.
+   */
+  def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = 
{
+if (channel != null) {
+  debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
+  dec(listenerName, channel.socket.getInetAddress)
+  closeSocket(channel, this)

Review Comment:
   It's surprising to find this in `ListenerConnectionQuota`, also that we end 
up with a different logger. Is there any way we can pull it back to where it 
was? For example, maybe we could generalize `closeSocket`:
   
   ```scala
 def closeSocket(
   listenerName: ListenerName,
   channel: SocketChannel,
   connectionQuota: ConnectionQuotas,
   logging: Logging
 )
   ```
   Or maybe we can stick it into a trait so that we can get rid of the 
`logging` parameter.



##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -681,24 +580,27 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private val blockedPercentMeter = 
newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
+  private var started = false
+  private[network] val startFuture = new CompletableFuture[Void]()
 
-  private[network] case class DelayedCloseSocket(socket: SocketChannel, 
endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
-override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs 
compare that.endThrottleTimeMs
-  }
+  val thread = KafkaThread.nonDaemon(
+
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
+this)
 
-  private[network] def startProcessors(): Unit = synchronized {
-if (!processorsStarted.getAndSet(true)) {
-  startProcessors(processors)
+  startFuture.thenRun(() => synchronized {
+if (!shouldRun.get()) {
+  debug(s"Ignoring start future for ${endPoint.listenerName} since it has 
already been shut down.")

Review Comment:
   nit: "it" -> "the acceptor"?



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -906,12 +906,35 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
 if (this != metaLogListener) {
 log.debug("Ignoring {} raft event from an old 
registration", name);
 } else {
-runnable.run();
+try {
+runnable.run();
+} finally {
+maybeCompleteAuthorizerInitialLoad();
+}
 }
 });
 }
 }
 
+private void maybeCompleteAuthorizerInitialLoad() {
+if (!needToCompleteAuthorizerLoad) return;
+OptionalLong highWatermark = raftClient.highWatermark();
+if (highWatermark.isPresent()) {
+if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) {

Review Comment:
   I guess the only issue with this is that the high watermark is a moving 
target. It probably works ok since writes to the metadata log should be 
infrequent. Not sure I have any better ideas. Maybe we could refresh the high 
watermark value only once every second or something like that.



##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -104,184 +103,141 @@ class SocketServer(val config: KafkaConfig,
 
   private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
-  private var startedProcessingRequests = false
-  private var stoppedProcessingRequests = false
 
-  // Processors are now created by each Acceptor. However to preserve 
compatibility, we need to number the processors
-  // globally, so we keep the nextProcessorId counter in SocketServer
-  def nextProcessorId(): Int = {
-nextProcessorId.getAndIncrement()
-  }
+  /**
+   * A future which is completed once all the authorizer futures are complete.
+   */
+  private val allAuthorizerFuturesComplete = new CompletableFuture[Void]
 
   /**
-   * Starts the socket server and creates all the Acceptors and the 
Processors. The Acceptors
-   * start listening at this stage so that the bound port is known when this 
method completes
-   * even when ephemeral ports are used. Acceptors and Processors are started 
if `startProcessingRequests`
-   * is true. If not, acceptors and processors are only started when 

[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies

2022-05-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13873:

Description: 
In order to reduce resources used or modify data pipelines, users may want to 
pause processing temporarily.  Presently, this would require stopping the 
entire KafkaStreams instance (or instances).  

This work would add the ability to pause and resume topologies.  When the need 
to pause processing has passed, then users should be able to resume processing.

KIP-834: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832]

  was:
In order to reduce resources used or modify data pipelines, users may want to 
pause processing temporarily.  Presently, this would require stopping the 
entire KafkaStreams instance (or instances).  

This work would add the ability to pause and resume topologies.  When the need 
to pause processing has passed, then users should be able to resume processing.


> Add ability to Pause / Resume KafkaStreams Topologies
> -
>
> Key: KAFKA-13873
> URL: https://issues.apache.org/jira/browse/KAFKA-13873
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>  Labels: kip
>
> In order to reduce resources used or modify data pipelines, users may want to 
> pause processing temporarily.  Presently, this would require stopping the 
> entire KafkaStreams instance (or instances).  
> This work would add the ability to pause and resume topologies.  When the 
> need to pause processing has passed, then users should be able to resume 
> processing.
> KIP-834: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies

2022-05-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13873:

Component/s: streams

> Add ability to Pause / Resume KafkaStreams Topologies
> -
>
> Key: KAFKA-13873
> URL: https://issues.apache.org/jira/browse/KAFKA-13873
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>
> In order to reduce resources used or modify data pipelines, users may want to 
> pause processing temporarily.  Presently, this would require stopping the 
> entire KafkaStreams instance (or instances).  
> This work would add the ability to pause and resume topologies.  When the 
> need to pause processing has passed, then users should be able to resume 
> processing.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies

2022-05-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13873:

Labels: kip  (was: )

> Add ability to Pause / Resume KafkaStreams Topologies
> -
>
> Key: KAFKA-13873
> URL: https://issues.apache.org/jira/browse/KAFKA-13873
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>  Labels: kip
>
> In order to reduce resources used or modify data pipelines, users may want to 
> pause processing temporarily.  Presently, this would require stopping the 
> entire KafkaStreams instance (or instances).  
> This work would add the ability to pause and resume topologies.  When the 
> need to pause processing has passed, then users should be able to resume 
> processing.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] artemlivshits commented on pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

2022-05-04 Thread GitBox


artemlivshits commented on PR #12049:
URL: https://github.com/apache/kafka/pull/12049#issuecomment-1117971682

   Hmm, for some reason the update didn't trigger any build & test jobs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #12049: KAFKA-10888: Sticky partition leads to uneven produce msg

2022-05-04 Thread GitBox


artemlivshits commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r865302205


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -960,8 +1002,10 @@ private Future doSend(ProducerRecord record, Callback call
 " to class " + 
producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() 
+
 " specified in value.serializer", cce);
 }
+
+// Try to calculate partition, but note that after this call it 
can be RecordMetadata.UNKNOWN_PARTITION,
+// which means that the RecordAccumulator would pick a partition 
based on broker load.

Review Comment:
   Missed, now rephrased.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java:
##
@@ -25,12 +23,15 @@
 
 private final Cluster cluster;
 private final Serializer keySerializer;
-private final DefaultPartitioner defaultPartitioner;
 
+@SuppressWarnings("deprecation")
+private final 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 
defaultPartitioner;
+
+@SuppressWarnings("deprecation")
 public DefaultStreamPartitioner(final Serializer keySerializer, final 
Cluster cluster) {
 this.cluster = cluster;
 this.keySerializer = keySerializer;
-this.defaultPartitioner = new DefaultPartitioner();
+this.defaultPartitioner = new 
org.apache.kafka.clients.producer.internals.DefaultPartitioner();

Review Comment:
   DefaultPartitioner implements onNewBatch, but DefaultStreamPartitioner 
doesn't seem to ever call it (the DefaultPartitioner is a private object).  
Without onNewBatch, the DefaultPartitioner.partition would return the same 
partition for unkeyed messages.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Built-in default partitioner.  Note, that this is just a utility class that 
is used directly from
+ * RecordAccumulator, it does not implement the Partitioner interface.
+ *
+ * The class keeps track of various bookkeeping information required for 
adaptive sticky partitioning
+ * (described in detail in KIP-794).  There is one partitioner object per 
topic.
+ */
+public class BuiltInPartitioner {
+private final Logger log;
+private final String topic;
+private final int stickyBatchSize;
+
+private volatile PartitionLoadStats partitionLoadStats = null;
+private final AtomicReference stickyPartitionInfo = 
new AtomicReference<>();
+
+// Visible and used for testing only.
+static volatile public Supplier mockRandom = null;
+
+/**
+ * BuiltInPartitioner constructor.
+ *
+ * @param topic The topic
+ * @param stickyBatchSize How much to produce to partition before switch
+ */
+public BuiltInPartitioner(LogContext logContext, String topic, int 
stickyBatchSize) {
+this.log = logContext.logger(BuiltInPartitioner.class);
+this.topic = topic;
+this.stickyBatchSize = stickyBatchSize;
+}
+
+/**
+ * Calculate the next partition for the topic based on the partition load 
stats.
+ */
+private int nextPartition(Cluster cluster) {
+int random = mockRandom != null ? mockRandom.get() : 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
+
+// Cache volatile variable in local variable.
+PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
+
+if (partitionLoadStats == null) {
+// We don't have stats to do 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode

2022-05-04 Thread GitBox


hachikuji commented on code in PR #12108:
URL: https://github.com/apache/kafka/pull/12108#discussion_r865377321


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -215,13 +215,17 @@ private void 
incrementalAlterConfigResource(ConfigResource configResource,
 }
 List newValueParts = getParts(newValue, key, 
configResource);
 if (opType == APPEND) {
-if (!newValueParts.contains(opValue)) {
-newValueParts.add(opValue);
+for (String value: opValue.split(",")) {
+if (!newValueParts.contains(value)) {

Review Comment:
   Do we have an integration test which covers the case when the appended value 
is already present? It kind of looks like the zk path doesn't handle that.
   
   By the way, I found the naming a little confusing here. I think 
`newValueParts` should be `currentValueParts` or something like that.



##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -1751,13 +1777,32 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, 
LogConfig.Compact + "," + LogConfig.Delete), AlterConfigOp.OpType.SUBTRACT)
 ).asJavaCollection
 
-alterResult = client.incrementalAlterConfigs(Map(
+alterConfigs = Map(
   topic1Resource -> topic1AlterConfigs,
   topic2Resource -> topic2AlterConfigs
-).asJava)
+)
+alterResult = client.incrementalAlterConfigs(alterConfigs.asJava)
 assertEquals(Set(topic1Resource, topic2Resource).asJava, 
alterResult.values.keySet)
 alterResult.all.get
 
+if (isKRaftTest()) {

Review Comment:
   I agree having some utilities would be nice. Rather than having something 
really specific, maybe we just need a utility which waits until the brokers 
have caught up to the controller end offset?



##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -215,13 +215,17 @@ private void 
incrementalAlterConfigResource(ConfigResource configResource,
 }
 List newValueParts = getParts(newValue, key, 
configResource);
 if (opType == APPEND) {
-if (!newValueParts.contains(opValue)) {
-newValueParts.add(opValue);
+for (String value: opValue.split(",")) {

Review Comment:
   nit: conventionally we put a space before the colon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 opened a new pull request, #12123: KAFKA-13830 Introduce metadata.version for KRaft

2022-05-04 Thread GitBox


ahuang98 opened a new pull request, #12123:
URL: https://github.com/apache/kafka/pull/12123

   From https://github.com/apache/kafka/pull/12050:
   
   > This patch includes a new metadata.version which is planned to replace IBP 
in KRaft clusters as specified in KIP-778. The kafka-storage tool now allows a 
user to specify a specific metadata.version to bootstrap into the cluster, 
otherwise the latest version is used.
   Upon the first leader election of the KRaft quroum, this initial 
metadata.version is written into the metadata log. When writing snapshots, a 
FeatureLevelRecord for metadata.version will be written out ahead of other 
records so we can decode things at the correct version level.
   This also includes additional validation in the controller when setting 
feature levels. It will now check that a given metadata.version is supportable 
by the quroum, not just the brokers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13815) Avoid reinitialization for a replica that is being deleted

2022-05-04 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13815.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

merged the PR to trunk

> Avoid reinitialization for a replica that is being deleted
> --
>
> Key: KAFKA-13815
> URL: https://issues.apache.org/jira/browse/KAFKA-13815
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Major
> Fix For: 3.3.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-10002
> identified that deletion of replicas can be slow when a StopReplica request 
> is being
> processed, and has implemented a change to improve the efficiency.
> We found that the efficiency can be further improved by avoiding the 
> reinitialization of the
> leader epoch cache and partition metadata for a replica that needs to be 
> deleted.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] junrao merged pull request #12029: KAFKA-13815: Avoid reinitialization for a replica that is being deleted

2022-05-04 Thread GitBox


junrao merged PR #12029:
URL: https://github.com/apache/kafka/pull/12029


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on a diff in pull request #12108: KAFKA-13862: Support Append/Subtract multiple config values in KRaft mode

2022-05-04 Thread GitBox


akhileshchg commented on code in PR #12108:
URL: https://github.com/apache/kafka/pull/12108#discussion_r865146433


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -1751,13 +1777,32 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, 
LogConfig.Compact + "," + LogConfig.Delete), AlterConfigOp.OpType.SUBTRACT)
 ).asJavaCollection
 
-alterResult = client.incrementalAlterConfigs(Map(
+alterConfigs = Map(
   topic1Resource -> topic1AlterConfigs,
   topic2Resource -> topic2AlterConfigs
-).asJava)
+)
+alterResult = client.incrementalAlterConfigs(alterConfigs.asJava)
 assertEquals(Set(topic1Resource, topic2Resource).asJava, 
alterResult.values.keySet)
 alterResult.all.get
 
+if (isKRaftTest()) {

Review Comment:
   I think this can be a util function:
   ```
   TestUtils.waitForMetadataPropagation(brokers, condition: KRaftMetadataCache 
=> Boolean): Unit = {
 assertTrue(isKRaftTest())
 TestUtils.waitUntilTrue(() => {
   brokers.forall { broker => 
condition(broker.metadataCache.asInstanceOf[KRaftMetadataCache])
 }, "Metadata is not propgated to all brokers")
   }
   ```
   
   and we can call it as:
   ```
   TestUtils.waitForMetadataPropagation(brokers, metadataCache => {
 // validation logic.
   })
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Gerrrr opened a new pull request, #12122: WIP: Upgrade tests for KAFKA-13769

2022-05-04 Thread GitBox


Ge opened a new pull request, #12122:
URL: https://github.com/apache/kafka/pull/12122

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13863) Prevent null config value when create topic in KRaft mode

2022-05-04 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17531880#comment-17531880
 ] 

Colin McCabe commented on KAFKA-13863:
--

We already validate that topic configurations can't be null in 
`ControllerConfigurationValidator`.

{code}
  override def validate(
resource: ConfigResource,
config: util.Map[String, String]
  ): Unit = {
resource.`type`() match {
  case TOPIC =>
validateTopicName(resource.name())
val properties = new Properties()
val nullTopicConfigs = new mutable.ArrayBuffer[String]()
config.entrySet().forEach(e => {
  if (e.getValue() == null) {
nullTopicConfigs += e.getKey()
  } else {
properties.setProperty(e.getKey(), e.getValue())
  }
})
if (nullTopicConfigs.nonEmpty) {
  throw new InvalidRequestException("Null value not supported for topic 
configs : " +
nullTopicConfigs.mkString(","))
}
{code}

I don't mind adding extra test coverage but there should be no need to change 
`ReplicationControlManager`, as far as I can see.

> Prevent null config value when create topic in KRaft mode
> -
>
> Key: KAFKA-13863
> URL: https://issues.apache.org/jira/browse/KAFKA-13863
> Project: Kafka
>  Issue Type: Bug
>Reporter: dengziming
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cmccabe commented on pull request #12109: KAFKA-13863: Prevent null config value when create topic in KRaft mode

2022-05-04 Thread GitBox


cmccabe commented on PR #12109:
URL: https://github.com/apache/kafka/pull/12109#issuecomment-1117657256

   We already validate that topic configurations can't be null in 
`ControllerConfigurationValidator`.
   
   ```
 override def validate(
   resource: ConfigResource,
   config: util.Map[String, String]
 ): Unit = {
   resource.`type`() match {
 case TOPIC =>
   validateTopicName(resource.name())
   val properties = new Properties()
   val nullTopicConfigs = new mutable.ArrayBuffer[String]()
   config.entrySet().forEach(e => {
 if (e.getValue() == null) {
   nullTopicConfigs += e.getKey()
 } else {
   properties.setProperty(e.getKey(), e.getValue())
 }
   })
   if (nullTopicConfigs.nonEmpty) {
 throw new InvalidRequestException("Null value not supported for 
topic configs : " +
   nullTopicConfigs.mkString(","))
   }
   ```
   
   I don't mind adding extra test coverage but there should be no need to 
change `ReplicationControlManager`, as far as I can see.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13861) validateOnly request field does not work for CreatePartition requests in Kraft mode.

2022-05-04 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13861:

Fix Version/s: 3.3.0

> validateOnly request field does not work for CreatePartition requests in 
> Kraft mode.
> 
>
> Key: KAFKA-13861
> URL: https://issues.apache.org/jira/browse/KAFKA-13861
> Project: Kafka
>  Issue Type: Bug
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Major
> Fix For: 3.3.0
>
>
> `ControllerApis` ignores the validateOnly field and the `QuorumController` 
> does not have any logic to handle the `validateOnly` requests for 
> `CreatePartitions.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13861) validateOnly request field does not work for CreatePartition requests in Kraft mode.

2022-05-04 Thread Akhilesh Chaganti (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akhilesh Chaganti resolved KAFKA-13861.
---
Resolution: Fixed

> validateOnly request field does not work for CreatePartition requests in 
> Kraft mode.
> 
>
> Key: KAFKA-13861
> URL: https://issues.apache.org/jira/browse/KAFKA-13861
> Project: Kafka
>  Issue Type: Bug
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Major
>
> `ControllerApis` ignores the validateOnly field and the `QuorumController` 
> does not have any logic to handle the `validateOnly` requests for 
> `CreatePartitions.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] guozhangwang commented on pull request #12041: MINOR: ignore unused configuration when ConsumerCoordinator is not constructed

2022-05-04 Thread GitBox


guozhangwang commented on PR #12041:
URL: https://github.com/apache/kafka/pull/12041#issuecomment-1117635812

   Thanks @C0urante for your thoughts. I'd like to clarify one thing that, 
today users can pass in both defined and unknown config values, where the 
latter may be used in some plugin modules (e.g. Kafka Streams's partition 
assignor). Since the `config.logUnused()` is called at the end of the the 
client constructor, at that time the latter category of configs may not be 
retrieved yet, and that does not mean that they will never be retrieved later 
after the constructor. So the logging message: "... were supplied but are not 
used yet." is reasonable, as "by that time" we do not know if they are never 
used or not, and we cannot just call `ignore` on all these configs in order to 
not log them.
   
   Now for the former case, generally we expect that by the time 
`config.logUnused()` is called, all defined configs should be retrieved. If the 
client does not retrieve them yet, AND users have specified values for those 
configs, it's debatable that we should let users know as a reminder that they 
can consider removing those overrides; whereas for those configs which are not 
overridden by users, we would not bother to let them know at all.
   
   If we want to do that, I'd suggest we do it universally: i.e. for all cases, 
including the previous `ignored` cases like `KEY_DESERIALIZER_CLASS_CONFIG`. 
Maybe you can send out a discussion email in the community to ask for a 
consensus?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies

2022-05-04 Thread Jim Hughes (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jim Hughes reassigned KAFKA-13873:
--

Assignee: Jim Hughes

> Add ability to Pause / Resume KafkaStreams Topologies
> -
>
> Key: KAFKA-13873
> URL: https://issues.apache.org/jira/browse/KAFKA-13873
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>
> In order to reduce resources used or modify data pipelines, users may want to 
> pause processing temporarily.  Presently, this would require stopping the 
> entire KafkaStreams instance (or instances).  
> This work would add the ability to pause and resume topologies.  When the 
> need to pause processing has passed, then users should be able to resume 
> processing.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies

2022-05-04 Thread Jim Hughes (Jira)
Jim Hughes created KAFKA-13873:
--

 Summary: Add ability to Pause / Resume KafkaStreams Topologies
 Key: KAFKA-13873
 URL: https://issues.apache.org/jira/browse/KAFKA-13873
 Project: Kafka
  Issue Type: New Feature
Reporter: Jim Hughes


In order to reduce resources used or modify data pipelines, users may want to 
pause processing temporarily.  Presently, this would require stopping the 
entire KafkaStreams instance (or instances).  

This work would add the ability to pause and resume topologies.  When the need 
to pause processing has passed, then users should be able to resume processing.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] vamossagar12 opened a new pull request, #12121: Kafka 13846: Adding overloaded metricOrElseCreate method

2022-05-04 Thread GitBox


vamossagar12 opened a new pull request, #12121:
URL: https://github.com/apache/kafka/pull/12121

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode

2022-05-04 Thread GitBox


hachikuji merged PR #12106:
URL: https://github.com/apache/kafka/pull/12106


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #11874: Fix typos in configuration docs

2022-05-04 Thread GitBox


guozhangwang merged PR #11874:
URL: https://github.com/apache/kafka/pull/11874


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2022-05-04 Thread GitBox


vamossagar12 commented on PR #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-1117583307

   > LGTM! Thanks for the PR and, the patience! @ableegoldman @guozhangwang 
@mjsax , do you want to have another look for this PR?
   
   hi.. whenever you get the chance, could you plz review the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13870) support both Suppressed untilTimeLimit and maxBytes without using emitEarlyWhenFull()

2022-05-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13870:

Labels: needs-kip  (was: )

> support both Suppressed untilTimeLimit and maxBytes without using 
> emitEarlyWhenFull()
> -
>
> Key: KAFKA-13870
> URL: https://issues.apache.org/jira/browse/KAFKA-13870
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Anil
>Priority: Major
>  Labels: needs-kip
>
> My use case is to use  ** *untilTimeLimit* with *maxBytes,* but when the 
> buffer is full, the application is breaking, but with using 
> *{{emitEarlyWhenFull}}* {{{}application is not breaking but{}}}{*}{{}}{*} it 
> sends out the same key record multiple times in a particular window when the 
> buffer exceeds max bytes 
> for eg:-
> *Suppressed.untilTimeLimit(Duration.ofMinutes(15),Suppressed.BufferConfig.maxBytes(1).emitEarlyWhenFull())*
>  
> messages flow : (A,1) (A,2) (A,3) -> aggregation result : (A,6) . suppose 
> here, the buffer is full, (A,6) will be sent downstream. Let's suppose (A,4) 
> comes now in the same tumbling window.
>  
> current response:- the aggregation will continue and eventually *(A,10)* will 
> be emitted
>  
> but our application expected *(A,4),*  so the request for the feature is that 
> window should be happening with window time(untilTimeLimit) or 
> Buffer(maxByte) should full, in either of these two conditions met, a new 
> window should be created and data should be emitted 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13870) support both Suppressed untilTimeLimit and maxBytes without using emitEarlyWhenFull()

2022-05-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17531836#comment-17531836
 ] 

Matthias J. Sax commented on KAFKA-13870:
-

I think you mix up two concepts: windowing is about "grouping" records 
according to their timestamp. Thus, the _window-size_ you define via 
`TimeWindows.withSizeAndGrace()` (or similar) defines into which window a 
record falls into.

On the other hand suppression has nothing to do with the definition of window 
bounds, but it's about when to emit (potentially partial) results for a window 
– however, if a partial result is emitted, the window is not closed.

It seems your request is more about a window definition rather than suppress, 
and you want some kind of "count based" window? It might be possible to add, 
but only if your requirement are clear. Note that the suppress buffer size has 
nothing to do with the window definition. The buffer size basically define how 
many windows the buffer can hold before it need to emit a partial result and 
drop a window from the buffer.

Thus, it's not clear to me what semantics you really need?

For now, you could still build it manually using the Processor API.

> support both Suppressed untilTimeLimit and maxBytes without using 
> emitEarlyWhenFull()
> -
>
> Key: KAFKA-13870
> URL: https://issues.apache.org/jira/browse/KAFKA-13870
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Anil
>Priority: Major
>
> My use case is to use  ** *untilTimeLimit* with *maxBytes,* but when the 
> buffer is full, the application is breaking, but with using 
> *{{emitEarlyWhenFull}}* {{{}application is not breaking but{}}}{*}{{}}{*} it 
> sends out the same key record multiple times in a particular window when the 
> buffer exceeds max bytes 
> for eg:-
> *Suppressed.untilTimeLimit(Duration.ofMinutes(15),Suppressed.BufferConfig.maxBytes(1).emitEarlyWhenFull())*
>  
> messages flow : (A,1) (A,2) (A,3) -> aggregation result : (A,6) . suppose 
> here, the buffer is full, (A,6) will be sent downstream. Let's suppose (A,4) 
> comes now in the same tumbling window.
>  
> current response:- the aggregation will continue and eventually *(A,10)* will 
> be emitted
>  
> but our application expected *(A,4),*  so the request for the feature is that 
> window should be happening with window time(untilTimeLimit) or 
> Buffer(maxByte) should full, in either of these two conditions met, a new 
> window should be created and data should be emitted 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12104: KAFKA-13746: Attempt to fix flaky test by waiting to fetch 2 topics f…

2022-05-04 Thread GitBox


vamossagar12 commented on code in PR #12104:
URL: https://github.com/apache/kafka/pull/12104#discussion_r865051847


##
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala:
##
@@ -733,12 +733,18 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   killBroker(0)
   val aliveServers = brokers.filterNot(_.config.brokerId == 0)
   TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0)
-  val output = TestUtils.grabConsoleOutput(
-topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"
+  var output = ""
+  TestUtils.waitUntilTrue(
+() => {
+  output = TestUtils.grabConsoleOutput(
+topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"

Review Comment:
   yeah.. i added it back @dengziming 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13872) Partitions are truncated when leader is replaced

2022-05-04 Thread Francois Visconte (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Francois Visconte updated KAFKA-13872:
--
Description: 
Sample setup:
 * a topic with one partition and RF=3
 * a producer using acks=1
 * min.insync.replicas to 1
 * 3 brokers 1,2,3
 * Preferred leader of the partition is brokerId 0

 

Steps to reproduce the issue
 * Producer keeps producing to the partition, leader is brokerId=0
 * At some point, replicas 1 and 2 are falling behind and removed from the ISR
 * The leader broker 0 has an hardware failure
 * Partition transition to offline
 * This leader is replaced with a new broker with an empty disk and the same 
broker id 0
 * Partition transition from offline to online with leader 0, ISR = 0
 * Followers see the leader offset is 0 and decide to truncate their partitions 
to 0, ISR=0,1,2
 * At this point all the topic data has been removed from all replicas and 
partition size drops to 0 on all replicas

Attached some of the relevant logs. I can provide more logs if necessary

  was:
Sample setup:
 * a topic with one partition and RF=3
 * a producer using acks=1
 * min.insync.replicas to 1
 * 3 brokers 1,2,3
 * Preferred leader of the partition is brokerId 0

 

Steps to reproduce the issue
 * Producer keeps producing to the partition, leader is brokerId=0
 * At some point, replicas 1 and 2 are falling behind and removed from the ISR
 * The leader broker 0 has an hardware failure
 * Partition transition to offline
 * This leader is replaced with a new broker with an empty disk and the same 
broker id 0
 * Partition transition from offline to online with leader 0, ISR = 0
 * Followers see the leader offset is 0 and decide to truncate their partitions 
to 0, ISR=0,1,2

Attached some of the relevant logs. I can provide more logs if necessary


> Partitions are truncated when leader is replaced
> 
>
> Key: KAFKA-13872
> URL: https://issues.apache.org/jira/browse/KAFKA-13872
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.2
>Reporter: Francois Visconte
>Priority: Major
> Attachments: extract-2022-05-04T15_50_34.110Z.csv
>
>
> Sample setup:
>  * a topic with one partition and RF=3
>  * a producer using acks=1
>  * min.insync.replicas to 1
>  * 3 brokers 1,2,3
>  * Preferred leader of the partition is brokerId 0
>  
> Steps to reproduce the issue
>  * Producer keeps producing to the partition, leader is brokerId=0
>  * At some point, replicas 1 and 2 are falling behind and removed from the ISR
>  * The leader broker 0 has an hardware failure
>  * Partition transition to offline
>  * This leader is replaced with a new broker with an empty disk and the same 
> broker id 0
>  * Partition transition from offline to online with leader 0, ISR = 0
>  * Followers see the leader offset is 0 and decide to truncate their 
> partitions to 0, ISR=0,1,2
>  * At this point all the topic data has been removed from all replicas and 
> partition size drops to 0 on all replicas
> Attached some of the relevant logs. I can provide more logs if necessary



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13872) Partitions are truncated when leader is replaced

2022-05-04 Thread Francois Visconte (Jira)
Francois Visconte created KAFKA-13872:
-

 Summary: Partitions are truncated when leader is replaced
 Key: KAFKA-13872
 URL: https://issues.apache.org/jira/browse/KAFKA-13872
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.2
Reporter: Francois Visconte
 Attachments: extract-2022-05-04T15_50_34.110Z.csv

Sample setup:
 * a topic with one partition and RF=3
 * a producer using acks=1
 * min.insync.replicas to 1
 * 3 brokers 1,2,3
 * Preferred leader of the partition is brokerId 0

 

Steps to reproduce the issue
 * Producer keeps producing to the partition, leader is brokerId=0
 * At some point, replicas 1 and 2 are falling behind and removed from the ISR
 * The leader broker 0 has an hardware failure
 * Partition transition to offline
 * This leader is replaced with a new broker with an empty disk and the same 
broker id 0
 * Partition transition from offline to online with leader 0, ISR = 0
 * Followers see the leader offset is 0 and decide to truncate their partitions 
to 0, ISR=0,1,2

Attached some of the relevant logs. I can provide more logs if necessary



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13871) The documentation for the configuration item QUORUM_FETCH_TIMEOUT of the RaftConfig class is incorrect

2022-05-04 Thread yingquan he (Jira)
yingquan he created KAFKA-13871:
---

 Summary: The documentation for the configuration item 
QUORUM_FETCH_TIMEOUT of the RaftConfig class is incorrect
 Key: KAFKA-13871
 URL: https://issues.apache.org/jira/browse/KAFKA-13871
 Project: Kafka
  Issue Type: Improvement
  Components: kraft
Reporter: yingquan he
Assignee: yingquan he


The syntax of the field QUORUM_FETCH_TIMEOUT_MS_DOC is incorrect. `a election`  
should be changed to `an election`.

 
{code:java}
public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time without 
a successful fetch from " +
"the current leader before becoming a candidate and triggering a election 
for voters; Maximum time without " +
"receiving fetch from a majority of the quorum before asking around to see 
if there's a new epoch for leader"; {code}
 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)