[GitHub] [kafka] hachikuji commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-05 Thread GitBox


hachikuji commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r571362924



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -169,6 +169,8 @@ class DefaultAutoTopicCreationManager(
 }
 }
 
+clearInflightRequests(creatableTopics)

Review comment:
   Can you use a `try/finally` here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10002: MINOR: remove the indent in security doc

2021-02-05 Thread GitBox


showuon commented on pull request #10002:
URL: https://github.com/apache/kafka/pull/10002#issuecomment-774401866


   @omkreddy , thanks for comments. I've updated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10002: MINOR: remove the indent in security doc

2021-02-05 Thread GitBox


showuon commented on a change in pull request #10002:
URL: https://github.com/apache/kafka/pull/10002#discussion_r571361102



##
File path: docs/security.html
##
@@ -649,22 +649,22 @@ Configuring Kafka Brokers
 
 Add a suitably modified JAAS file similar to the one below to 
each Kafka broker's config directory, let's call it kafka_server_jaas.conf for 
this example:
-
KafkaServer {
-org.apache.kafka.common.security.plain.PlainLoginModule required
-username="admin"
-password="admin-secret"
-user_admin="admin-secret"
-user_alice="alice-secret";
-};
+KafkaServer {
+org.apache.kafka.common.security.plain.PlainLoginModule required
+username="admin"
+password="admin-secret"
+user_admin="admin-secret"
+user_alice="alice-secret";
+};
 This configuration defines two users (admin and 
alice). The properties username and password
 in the KafkaServer section are used by the broker to 
initiate connections to other brokers. In this example,
 admin is the user for inter-broker communication. The 
set of properties user_userName defines
 the passwords for all users that connect to the broker and the 
broker validates all client connections including
 those from other brokers using these properties.
 Pass the JAAS config file location as JVM parameter to each 
Kafka broker:
-
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
 Configure SASL port and SASL mechanisms in server.properties 
as described here. For example:
-listeners=SASL_SSL://host.name:port
+listeners=SASL_SSL://host.name:port

Review comment:
   Wow, I've reviewed it twice but didn't found it! Thanks for pointing 
out. Updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #10074: KAFKA-12305: Fix Flatten SMT for array types

2021-02-05 Thread GitBox


C0urante opened a new pull request #10074:
URL: https://github.com/apache/kafka/pull/10074


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12305)
   
   (Copied from Jira):
   
   The `Flatten` SMT fails for array types. A sophisticated approach that tries 
to flatten arrays might be desirable in some cases, and may have been punted 
during the early design phase of the transform, but in the interim, it's 
probably not worth it to make array data and the SMT mutually exclusive.
   
   A naive approach that preserves arrays as-are and doesn't attempt to flatten 
them seems fair for now.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12305) Flatten SMT fails on arrays

2021-02-05 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12305:
--
Description: 
The {{Flatten}} SMT fails for array types. A sophisticated approach that tries 
to flatten arrays might be desirable in some cases, and may have been punted 
during the early design phase of the transform, but in the interim, it's 
probably not worth it to make array data and the SMT mutually exclusive.

A naive approach that preserves arrays as-are and doesn't attempt to flatten 
them seems fair for now.

> Flatten SMT fails on arrays
> ---
>
> Key: KAFKA-12305
> URL: https://issues.apache.org/jira/browse/KAFKA-12305
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1, 
> 2.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> The {{Flatten}} SMT fails for array types. A sophisticated approach that 
> tries to flatten arrays might be desirable in some cases, and may have been 
> punted during the early design phase of the transform, but in the interim, 
> it's probably not worth it to make array data and the SMT mutually exclusive.
> A naive approach that preserves arrays as-are and doesn't attempt to flatten 
> them seems fair for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12305) Flatten SMT fails on arrays

2021-02-05 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-12305:
-

 Summary: Flatten SMT fails on arrays
 Key: KAFKA-12305
 URL: https://issues.apache.org/jira/browse/KAFKA-12305
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.6.1, 2.7.0, 2.5.1, 2.4.1, 2.3.1, 2.2.2, 2.1.1, 2.0.1, 
2.8.0
Reporter: Chris Egerton
Assignee: Chris Egerton






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante opened a new pull request #10073: KAFKA-12303: Fix handling of null values by Flatten SMT

2021-02-05 Thread GitBox


C0urante opened a new pull request #10073:
URL: https://github.com/apache/kafka/pull/10073


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12303)
   
   Using `return` instead of `continue` when encountering null fields causes 
the remainder of the fields to be skipped. This PR addresses that, and adds a 
lightweight unit test that is able to reproduce the error on current versions 
of Connect, and verify the accuracy of the fix.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12304) Improve topic validation in auto topic creation

2021-02-05 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-12304:
---

 Summary: Improve topic validation in auto topic creation
 Key: KAFKA-12304
 URL: https://issues.apache.org/jira/browse/KAFKA-12304
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


The topic validation path should have a higher priority than other follow-up 
processes. Basically we should move it right after the topic authorization is 
done in metadata request handling.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12303) Flatten SMT drops some fields when null values are present

2021-02-05 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12303:
--
Summary: Flatten SMT drops some fields when null values are present  (was: 
Flatten SMT drops some nested fields)

> Flatten SMT drops some fields when null values are present
> --
>
> Key: KAFKA-12303
> URL: https://issues.apache.org/jira/browse/KAFKA-12303
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1, 
> 2.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> [This 
> line|https://github.com/apache/kafka/blob/0bc394cc1d19f1e41dd6646e9ac0e09b91fb1398/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java#L109]
>  should be {{continue}} instead of {{return}}; otherwise, the rest of the 
> entries in the currently-being-iterated map are skipped unnecessarily.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12303) Flatten SMT drops some nested fields

2021-02-05 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-12303:
-

 Summary: Flatten SMT drops some nested fields
 Key: KAFKA-12303
 URL: https://issues.apache.org/jira/browse/KAFKA-12303
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.6.1, 2.7.0, 2.5.1, 2.4.1, 2.3.1, 2.2.2, 2.1.1, 2.0.1, 
2.8.0
Reporter: Chris Egerton
Assignee: Chris Egerton


[This 
line|https://github.com/apache/kafka/blob/0bc394cc1d19f1e41dd6646e9ac0e09b91fb1398/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java#L109]
 should be {{continue}} instead of {{return}}; otherwise, the rest of the 
entries in the currently-being-iterated map are skipped unnecessarily.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax merged pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-02-05 Thread GitBox


mjsax merged pull request #1:
URL: https://github.com/apache/kafka/pull/1


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

2021-02-05 Thread GitBox


mjsax commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r571331549



##
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##
@@ -32,6 +33,7 @@
 public final int topicGroupId;
 /** The ID of the partition. */
 public final int partition;
+public Task task;

Review comment:
   I was not happy about it either... Any good suggestions how to do 
better? I could not come up with a better solution quickly unfortunately. :(
   
   We could add it to the constructor and make it mandatory, but the "splash 
radios" would be quite large...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


cmccabe merged pull request #10049:
URL: https://github.com/apache/kafka/pull/10049


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-02-05 Thread GitBox


ableegoldman commented on a change in pull request #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r571317298



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -229,17 +231,22 @@ public void initializeIfNeeded() {
 }
 }
 
+public void addPartitionsForOffsetReset(final Set 
partitionsForOffsetReset) {
+mainConsumer.pause(partitionsForOffsetReset);
+resetOffsetsForPartitions.addAll(partitionsForOffsetReset);
+}
+
 /**
  * @throws TimeoutException if fetching committed offsets timed out
  */
 @Override
-public void completeRestoration() {
+public void completeRestoration(final 
java.util.function.Consumer> offsetResetter) {
 switch (state()) {
 case RUNNING:
 return;
 
 case RESTORING:
-initializeMetadata();
+resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);

Review comment:
   cool, thanks, this seems much cleaner to me





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

2021-02-05 Thread GitBox


ableegoldman commented on a change in pull request #10072:
URL: https://github.com/apache/kafka/pull/10072#discussion_r571317046



##
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##
@@ -32,6 +33,7 @@
 public final int topicGroupId;
 /** The ID of the partition. */
 public final int partition;
+public Task task;

Review comment:
   I have to say, it makes me a little uncomfortable to stick the actual 
`Task` object inside the basic `TaskId` container class. Especially if 99% of 
the time it will be null, given that we use `TaskId` all over the place and 
only call `setTask` a handful of time. It will only get increasingly difficult 
about whether it's safe to assume a given `TaskId` object has an actual 
non-null `Task` or not, and therefore is safe to use 😬 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key

2021-02-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280044#comment-17280044
 ] 

Matthias J. Sax commented on KAFKA-12213:
-

Works for me. Just wanted to point out, that [~bbejeck] also started to look 
into this (just learned about it), breaking it down into similar PRs.

You should sync with him to avoid duplicate work. From my understanding, he 
started to prepare a PR for the `ValueJoiner` interface.

> Kafka Streams aggregation Initializer to accept record key
> --
>
> Key: KAFKA-12213
> URL: https://issues.apache.org/jira/browse/KAFKA-12213
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Piotr Fras
>Assignee: Piotr Fras
>Priority: Minor
>  Labels: needs-kip
>
> Sometimes Kafka record key contains useful information for creating a zero 
> object in aggregation Initializer. This feature is to add kafka record key to 
> Initializer.
> There were two approaches I considered to implement this feature, one 
> respecting backwards compatibility for internal and external APIs and the 
> other one which is not. I chose the latter one as it was more strait-forward. 
> We may want to validate this approach tho.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax opened a new pull request #10072: KAFKA-9274: Throw TaskCorruptedException instead of TimeoutException when TX commit times out

2021-02-05 Thread GitBox


mjsax opened a new pull request #10072:
URL: https://github.com/apache/kafka/pull/10072


   Part of KIP-572: follow up work to PR #9800. It's not save to retry a TX
   commit after a timeout, because it's unclear if the commit was
   successful or not, and thus on retry we might get an
   IllegalStateException. Instead, we will throw a TaskCorruptedException
   to retry the TX if the commit failed.
   
   Call for review @ableegoldman @vvcephei @abbccdda @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571309365



##
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.log.LogManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpoints}
+import kafka.server.metadata.{ConfigRepository, MetadataBroker, 
MetadataBrokers, MetadataImageBuilder, MetadataPartition}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Map, Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+ metrics: Metrics,
+ time: Time,
+ scheduler: Scheduler,
+ logManager: LogManager,
+ isShuttingDown: AtomicBoolean,
+ quotaManagers: QuotaManagers,
+ brokerTopicStats: BrokerTopicStats,
+ metadataCache: MetadataCache,
+ logDirFailureChannel: LogDirFailureChannel,
+ delayedProducePurgatory: 
DelayedOperationPurgatory[DelayedProduce],
+ delayedFetchPurgatory: 
DelayedOperationPurgatory[DelayedFetch],
+ delayedDeleteRecordsPurgatory: 
DelayedOperationPurgatory[DelayedDeleteRecords],
+ delayedElectLeaderPurgatory: 
DelayedOperationPurgatory[DelayedElectLeader],
+ threadNamePrefix: Option[String],
+ configRepository: ConfigRepository,
+ alterIsrManager: AlterIsrManager) extends 
ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, 
quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, 
delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, 
threadNamePrefix, configRepository, alterIsrManager) {
+
+  if (config.requiresZookeeper) {
+throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} 
when using ZooKeeper")
+  }
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, 
and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker 
(re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this 
indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for 
that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+replicaStateChangeLock synchronized {
+  deferringMetadataChanges = true
+  stateChangeLogger.info(s"Metadata changes are now being deferred")
+}
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+val startMs = time.milliseconds()
+replicaStateChangeLock synchronized {
+  stateChangeLogger.info(s"Applying deferred metadata changes")
+  val highWatermarkCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+  val partitionsMadeFollower = mutable.Set[Partition]()
+  val partitionsMadeLeader = mutable.Set[Partition]()
+  val leadershipChangeCallbacks =
+mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, 
(mutable.Set[Partition], mutable.Set[Partition])]()
+  try {
+val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+val followerPartitionStates = mutable.Map[Partition, 
MetadataPartition]()
+val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+val 

[GitHub] [kafka] rondagostino commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571237871



##
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.log.LogManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpoints}
+import kafka.server.metadata.{ConfigRepository, MetadataBroker, 
MetadataBrokers, MetadataImageBuilder, MetadataPartition}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Scheduler
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Map, Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+ metrics: Metrics,
+ time: Time,
+ scheduler: Scheduler,
+ logManager: LogManager,
+ isShuttingDown: AtomicBoolean,
+ quotaManagers: QuotaManagers,
+ brokerTopicStats: BrokerTopicStats,
+ metadataCache: MetadataCache,
+ logDirFailureChannel: LogDirFailureChannel,
+ delayedProducePurgatory: 
DelayedOperationPurgatory[DelayedProduce],
+ delayedFetchPurgatory: 
DelayedOperationPurgatory[DelayedFetch],
+ delayedDeleteRecordsPurgatory: 
DelayedOperationPurgatory[DelayedDeleteRecords],
+ delayedElectLeaderPurgatory: 
DelayedOperationPurgatory[DelayedElectLeader],
+ threadNamePrefix: Option[String],
+ configRepository: ConfigRepository,
+ alterIsrManager: AlterIsrManager) extends 
ReplicaManager(
+  config, metrics, time, None, scheduler, logManager, isShuttingDown, 
quotaManagers,
+  brokerTopicStats, metadataCache, logDirFailureChannel, 
delayedProducePurgatory, delayedFetchPurgatory,
+  delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory, 
threadNamePrefix, configRepository, alterIsrManager) {
+
+  if (config.requiresZookeeper) {
+throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName} 
when using ZooKeeper")
+  }
+
+  // Changes are initially deferred when using a Raft-based metadata quorum, 
and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker 
(re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this 
indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for 
that partition.
+  private var deferringMetadataChanges: Boolean = true
+  stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+  def beginMetadataChangeDeferral(): Unit = {
+replicaStateChangeLock synchronized {
+  deferringMetadataChanges = true
+  stateChangeLogger.info(s"Metadata changes are now being deferred")
+}
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+val startMs = time.milliseconds()
+replicaStateChangeLock synchronized {
+  stateChangeLogger.info(s"Applying deferred metadata changes")
+  val highWatermarkCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+  val partitionsMadeFollower = mutable.Set[Partition]()
+  val partitionsMadeLeader = mutable.Set[Partition]()
+  val leadershipChangeCallbacks =
+mutable.Map[(Iterable[Partition], Iterable[Partition]) => Unit, 
(mutable.Set[Partition], mutable.Set[Partition])]()
+  try {
+val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+val followerPartitionStates = mutable.Map[Partition, 
MetadataPartition]()
+val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+val 

[jira] [Assigned] (KAFKA-9195) Use Randomized State Directory Names in Streams System Tests

2021-02-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9195:
--

Assignee: Matthias J. Sax

> Use Randomized State Directory Names in Streams System Tests 
> -
>
> Key: KAFKA-9195
> URL: https://issues.apache.org/jira/browse/KAFKA-9195
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: Bruno Cadonna
>Assignee: Matthias J. Sax
>Priority: Major
>
> Currently, the state directory property in streams' system tests is set to 
> the {{PERSISTENT_ROOT}} variable. Since Streams applications in different 
> tests have the same application ID and the state directory path consists of 
> state directory property + application ID + task ID, it might happen that a 
> dirty state directory of one test is re-used by another test if the state 
> directory is not properly cleaned up. This may lead to unexpected results and 
> false positive and/or flaky failures.
> The state directory property shall be set to a randomized path inside 
> {{PERSISTENT_ROOT}} to ensure that tests may not interfere with each other in 
> the case of missing state clean-ups.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key

2021-02-05 Thread Piotr Fras (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280034#comment-17280034
 ] 

Piotr Fras edited comment on KAFKA-12213 at 2/5/21, 11:09 PM:
--

I am going to break down KIP-149 into five streams of work and submit 
individual pull request for each of the following interfaces, for both Java and 
Scala API:

- ValueTransformer

- ValueMapper

- ValueJoiner

- Initializer (attempt made to address in 
[https://github.com/apache/kafka/pull/9908])

- Reducer

Let me know if that works for you.


was (Author: moncalamari):
I am going to break down KIP-149 into five streams of work and submit 
individual pull request for each of the following interfaces, for both Java and 
Scala API:

- ValueTransformer

- ValueMapper

- ValueJoiner

- Initializer (already addressed in [https://github.com/apache/kafka/pull/9908])

- Reducer

Let me know if that works for you.

> Kafka Streams aggregation Initializer to accept record key
> --
>
> Key: KAFKA-12213
> URL: https://issues.apache.org/jira/browse/KAFKA-12213
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Piotr Fras
>Assignee: Piotr Fras
>Priority: Minor
>  Labels: needs-kip
>
> Sometimes Kafka record key contains useful information for creating a zero 
> object in aggregation Initializer. This feature is to add kafka record key to 
> Initializer.
> There were two approaches I considered to implement this feature, one 
> respecting backwards compatibility for internal and external APIs and the 
> other one which is not. I chose the latter one as it was more strait-forward. 
> We may want to validate this approach tho.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key

2021-02-05 Thread Piotr Fras (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280034#comment-17280034
 ] 

Piotr Fras commented on KAFKA-12213:


I am going to break down KIP-149 into five streams of work and submit 
individual pull request for each of the following interfaces:

- ValueTransformer

- ValueMapper

- ValueJoiner

- Initializer (already addressed in [https://github.com/apache/kafka/pull/9908])

- Reducer

Let me know if that works for you.

> Kafka Streams aggregation Initializer to accept record key
> --
>
> Key: KAFKA-12213
> URL: https://issues.apache.org/jira/browse/KAFKA-12213
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Piotr Fras
>Assignee: Piotr Fras
>Priority: Minor
>  Labels: needs-kip
>
> Sometimes Kafka record key contains useful information for creating a zero 
> object in aggregation Initializer. This feature is to add kafka record key to 
> Initializer.
> There were two approaches I considered to implement this feature, one 
> respecting backwards compatibility for internal and external APIs and the 
> other one which is not. I chose the latter one as it was more strait-forward. 
> We may want to validate this approach tho.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key

2021-02-05 Thread Piotr Fras (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280034#comment-17280034
 ] 

Piotr Fras edited comment on KAFKA-12213 at 2/5/21, 11:02 PM:
--

I am going to break down KIP-149 into five streams of work and submit 
individual pull request for each of the following interfaces, for both Java and 
Scala API:

- ValueTransformer

- ValueMapper

- ValueJoiner

- Initializer (already addressed in [https://github.com/apache/kafka/pull/9908])

- Reducer

Let me know if that works for you.


was (Author: moncalamari):
I am going to break down KIP-149 into five streams of work and submit 
individual pull request for each of the following interfaces:

- ValueTransformer

- ValueMapper

- ValueJoiner

- Initializer (already addressed in [https://github.com/apache/kafka/pull/9908])

- Reducer

Let me know if that works for you.

> Kafka Streams aggregation Initializer to accept record key
> --
>
> Key: KAFKA-12213
> URL: https://issues.apache.org/jira/browse/KAFKA-12213
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Piotr Fras
>Assignee: Piotr Fras
>Priority: Minor
>  Labels: needs-kip
>
> Sometimes Kafka record key contains useful information for creating a zero 
> object in aggregation Initializer. This feature is to add kafka record key to 
> Initializer.
> There were two approaches I considered to implement this feature, one 
> respecting backwards compatibility for internal and external APIs and the 
> other one which is not. I chose the latter one as it was more strait-forward. 
> We may want to validate this approach tho.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10678) Re-deploying Streams app causes rebalance and task migration

2021-02-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-10678.

  Assignee: A. Sophie Blee-Goldman
Resolution: Fixed

Resolved via [https://github.com/apache/kafka/pull/9978]

[~thebearmayor] this should be fixed in the upcoming 2.8.0 release and 2.6.2 
releases which are currently in progress (and in 2.7.1 but I'm not sure of the 
schedule for that yet). If/when you're able to upgrade to one of these, please 
verify that the task shuffling due to redeployment has been mitigated. And 
obviously, reopen this ticket if not – thanks!

> Re-deploying Streams app causes rebalance and task migration
> 
>
> Key: KAFKA-10678
> URL: https://issues.apache.org/jira/browse/KAFKA-10678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.6.1
>Reporter: Bradley Peterson
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
> Attachments: after, before, broker
>
>
> Re-deploying our Streams app causes a rebalance, even when using static group 
> membership. Worse, the rebalance creates standby tasks, even when the 
> previous task assignment was balanced and stable.
> Our app is currently using Streams 2.6.1-SNAPSHOT (due to [KAFKA-10633]) but 
> we saw the same behavior in 2.6.0. The app runs on 4 EC2 instances, each with 
> 4 streams threads, and data stored on persistent EBS volumes.. During a 
> redeploy, all EC2 instances are stopped, new instances are launched, and the 
> EBS volumes are attached to the new instances. We do not use interactive 
> queries. {{session.timeout.ms}} is set to 30 minutes, and the deployment 
> finishes well under that. {{num.standby.replicas}} is 0.
> h2. Expected Behavior
> Given a stable and balanced task assignment prior to deploying, we expect to 
> see the same task assignment after deploying. Even if a rebalance is 
> triggered, we do not expect to see new standby tasks.
> h2. Observed Behavior
> Attached are the "Assigned tasks to clients" log lines from before and after 
> deploying. The "before" is from over 24 hours ago, the task assignment is 
> well balanced and "Finished stable assignment of tasks, no followup 
> rebalances required." is logged. The "after" log lines show the same 
> assignment of active tasks, but some additional standby tasks. There are 
> additional log lines about adding and removing active tasks, which I don't 
> quite understand.
> I've also included logs from the broker showing the rebalance was triggered 
> for "Updating metadata".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10678) Re-deploying Streams app causes rebalance and task migration

2021-02-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10678:
---
Fix Version/s: 2.6.2
   2.7.1
   2.8.0

> Re-deploying Streams app causes rebalance and task migration
> 
>
> Key: KAFKA-10678
> URL: https://issues.apache.org/jira/browse/KAFKA-10678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.6.1
>Reporter: Bradley Peterson
>Priority: Major
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
> Attachments: after, before, broker
>
>
> Re-deploying our Streams app causes a rebalance, even when using static group 
> membership. Worse, the rebalance creates standby tasks, even when the 
> previous task assignment was balanced and stable.
> Our app is currently using Streams 2.6.1-SNAPSHOT (due to [KAFKA-10633]) but 
> we saw the same behavior in 2.6.0. The app runs on 4 EC2 instances, each with 
> 4 streams threads, and data stored on persistent EBS volumes.. During a 
> redeploy, all EC2 instances are stopped, new instances are launched, and the 
> EBS volumes are attached to the new instances. We do not use interactive 
> queries. {{session.timeout.ms}} is set to 30 minutes, and the deployment 
> finishes well under that. {{num.standby.replicas}} is 0.
> h2. Expected Behavior
> Given a stable and balanced task assignment prior to deploying, we expect to 
> see the same task assignment after deploying. Even if a rebalance is 
> triggered, we do not expect to see new standby tasks.
> h2. Observed Behavior
> Attached are the "Assigned tasks to clients" log lines from before and after 
> deploying. The "before" is from over 24 hours ago, the task assignment is 
> well balanced and "Finished stable assignment of tasks, no followup 
> rebalances required." is logged. The "after" log lines show the same 
> assignment of active tasks, but some additional standby tasks. There are 
> additional log lines about adding and removing active tasks, which I don't 
> quite understand.
> I've also included logs from the broker showing the rebalance was triggered 
> for "Updating metadata".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12213) Kafka Streams aggregation Initializer to accept record key

2021-02-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280030#comment-17280030
 ] 

Matthias J. Sax commented on KAFKA-12213:
-

It's totally fine to implement it incrementally across multiple releases.

IIRC, when KIP-149 was done, we did not have a Scala API yet. If there are 
issues, we should discuss solutions on the dev mailing list as follow up, and 
update the KIP accordingly.

> Kafka Streams aggregation Initializer to accept record key
> --
>
> Key: KAFKA-12213
> URL: https://issues.apache.org/jira/browse/KAFKA-12213
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Piotr Fras
>Assignee: Piotr Fras
>Priority: Minor
>  Labels: needs-kip
>
> Sometimes Kafka record key contains useful information for creating a zero 
> object in aggregation Initializer. This feature is to add kafka record key to 
> Initializer.
> There were two approaches I considered to implement this feature, one 
> respecting backwards compatibility for internal and external APIs and the 
> other one which is not. I chose the latter one as it was more strait-forward. 
> We may want to validate this approach tho.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12288) Remove task-level filesystem locks

2021-02-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12288:

Component/s: streams

> Remove task-level filesystem locks
> --
>
> Key: KAFKA-12288
> URL: https://issues.apache.org/jira/browse/KAFKA-12288
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>
> Since you actually can't run multiple instances of a Kafka Streams 
> application on the same physical state directory, there's really no benefit 
> to the file locks we obtain as part of the task directory locking. The only 
> safety measures we need are to protect between threads within a process, not 
> across processes. The in-memory map of locks should be sufficient for the 
> StateDirectory to be safe.
> So, we should take out all of the file-based locking of task directories. 
> This should allow us to greatly simplify the StateDirectory code, and 
> eliminate the source of some problems we've faced in the past, particularly 
> those finicky FS/OS dependent issues



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-05 Thread GitBox


hachikuji commented on pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#issuecomment-774326674


   @abbccdda Thanks for the updates. I opened a PR with a few fixes to speed 
this along since we're trying to get it checked in today: 
https://github.com/abbccdda/kafka/pull/6. The tests that were previously 
failing now seem to be passing (at least when testing locally).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10039:
URL: https://github.com/apache/kafka/pull/10039#discussion_r571283864



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -352,13 +348,15 @@ class LogManager(logDirs: Seq[File],
 val numLogsLoaded = new AtomicInteger(0)
 numTotalLogs += logsToLoad.length
 
-val jobsForDir = logsToLoad.map { logDir =>
+val jobsForDir = logsToLoad
+  .filter(logDir => Log.parseTopicPartitionName(logDir).topic != 
KafkaRaftServer.MetadataTopic)

Review comment:
   We could get rid of the parameter in `startup()` and create a 
`TopicLogConfigOverrideRetriever` class that held the `ConfigRepository` and 
would retrieve any topic configs in a thread-safe manner -- then we just pass 
that to `loadLog()` and it asks for the overrides, and the class would retrieve 
iff they haven't been retrieved before.  WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10039:
URL: https://github.com/apache/kafka/pull/10039#discussion_r571283864



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -352,13 +348,15 @@ class LogManager(logDirs: Seq[File],
 val numLogsLoaded = new AtomicInteger(0)
 numTotalLogs += logsToLoad.length
 
-val jobsForDir = logsToLoad.map { logDir =>
+val jobsForDir = logsToLoad
+  .filter(logDir => Log.parseTopicPartitionName(logDir).topic != 
KafkaRaftServer.MetadataTopic)

Review comment:
   We could get rid of the parameter in `startup()` and create a 
'TopicLogConfigOverrideRetriever` class that held the `ConfigRepository` and 
would retrieve any topic configs in a thread-safe manner -- then we just pass 
that to `loadLog()` and it asks for the overrides, and the class would retrieve 
iff they haven't been retrieved before.  WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10039:
URL: https://github.com/apache/kafka/pull/10039#discussion_r571278884



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -352,13 +348,15 @@ class LogManager(logDirs: Seq[File],
 val numLogsLoaded = new AtomicInteger(0)
 numTotalLogs += logsToLoad.length
 
-val jobsForDir = logsToLoad.map { logDir =>
+val jobsForDir = logsToLoad
+  .filter(logDir => Log.parseTopicPartitionName(logDir).topic != 
KafkaRaftServer.MetadataTopic)

Review comment:
   Yeah, we use the list of topics just to know what to check for log 
config overrides.  This will see anything in the log dir.  Now that I realize 
it, we could probably use this to know what topics we need to look at for log 
config overrides.  But I don't want to change too much at this point.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10039:
URL: https://github.com/apache/kafka/pull/10039#discussion_r571276941



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
*  Start the background threads to flush logs and do log cleanup
*/
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {

Review comment:
   Actually, this isn't the list of topics to load -- it's the list of 
topics for which we will look for log config overrides.  
`retrieveTopicsForLogConfigOverrideCheck`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10039:
URL: https://github.com/apache/kafka/pull/10039#discussion_r571275654



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
*  Start the background threads to flush logs and do log cleanup
*/
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {

Review comment:
   Would like to keep it as a verb: `retrieveTopicsToLoad`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10039:
URL: https://github.com/apache/kafka/pull/10039#discussion_r571274481



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
*  Start the background threads to flush logs and do log cleanup
*/
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {
+
startupWithTopicLogConfigOverrides(generateTopicLogConfigs(retrieveTopicNames))

Review comment:
   Yes, `startup()` is invoked immediately after the instance is 
constructed when we are using ZooKeeper, and we can get the configs at that 
point since we can just query ZK.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10039: MINOR: Defer log recovery until LogManager startup

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10039:
URL: https://github.com/apache/kafka/pull/10039#discussion_r571268386



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
*  Start the background threads to flush logs and do log cleanup
*/
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {
+
startupWithTopicLogConfigOverrides(generateTopicLogConfigs(retrieveTopicNames))

Review comment:
   We are able to load the topic configs right away because they are coming 
from ZK, right?
   
   

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
*  Start the background threads to flush logs and do log cleanup
*/
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {
+
startupWithTopicLogConfigOverrides(generateTopicLogConfigs(retrieveTopicNames))
+  }
+
+  // visible for testing
+  private[log] def generateTopicLogConfigs(topicNames: Set[String]): 
Map[String, LogConfig] = {
+val topicLogConfigs: mutable.Map[String, LogConfig] = mutable.Map()

Review comment:
   nit: can move type info to right hand side and just have `val 
topicLogConfigs = ...`

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
*  Start the background threads to flush logs and do log cleanup
*/
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {

Review comment:
   Feels slightly odd to pass in the set of topics to load here, but I 
can't think of a good way to avoid it. Perhaps we could pass MetadataCache into 
LogManager and let startup call MetadataCache#getAllTopics? That might be more 
risky though since it changes the startup order in KafkaServer, maybe we can 
look into this as a follow-up.
   
   Besides that, the name here seems strange. Maybe something like 
"topicsToLoad"?

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -352,13 +348,15 @@ class LogManager(logDirs: Seq[File],
 val numLogsLoaded = new AtomicInteger(0)
 numTotalLogs += logsToLoad.length
 
-val jobsForDir = logsToLoad.map { logDir =>
+val jobsForDir = logsToLoad
+  .filter(logDir => Log.parseTopicPartitionName(logDir).topic != 
KafkaRaftServer.MetadataTopic)

Review comment:
   Will the metadata topic get passed into LogManager? I would guess not 
since it's a Raft topic and not a regular Kafka topic. 
   
   Also style nit: you can do `logsToLoad.filter { logDir => ... }` similar to 
the `map` below





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12302) Flaky Test SaslApiVersionsRequestTest#testApiVersionsRequestWithUnsupportedVersion

2021-02-05 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-12302:
---

 Summary: Flaky Test 
SaslApiVersionsRequestTest#testApiVersionsRequestWithUnsupportedVersion
 Key: KAFKA-12302
 URL: https://issues.apache.org/jira/browse/KAFKA-12302
 Project: Kafka
  Issue Type: Test
  Components: core, unit tests
Reporter: Matthias J. Sax


[https://github.com/apache/kafka/pull/1/checks?check_run_id=1836684124] 
{quote}{{org.opentest4j.AssertionFailedError: Topic [__consumer_offsets] 
metadata not propagated after 6 ms
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
at org.junit.jupiter.api.Assertions.fail(Assertions.java:117)
at 
kafka.utils.TestUtils$.waitForAllPartitionsMetadata(TestUtils.scala:852)
at kafka.utils.TestUtils$.createTopic(TestUtils.scala:367)
at kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:429)
at 
kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:109)
at 
kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:84)
at 
kafka.server.SaslApiVersionsRequestTest.setUp(SaslApiVersionsRequestTest.scala:41)}}
 
{quote}
{{}}

{{}}

 

 

STDOUT
{quote}{{[2021-02-05 05ː26ː04,270] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/kafka2215259725436410019.tmp'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn:1094)
[2021-02-05 05ː26ː04,280] ERROR [ZooKeeperClient] Auth failed. 
(kafka.zookeeper.ZooKeeperClient:74)
[2021-02-05 05ː26ː04,304] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/kafka2215259725436410019.tmp'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn:1094)
[2021-02-05 05ː26ː04,331] ERROR [ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient:74)
[2021-02-05 05ː26ː05,337] ERROR [RequestSendThread controllerId=0] Controller 0 
fails to send a request to broker localhost:36921 (id: 0 rack: null) 
(kafka.controller.RequestSendThread:76)
java.lang.InterruptedException
at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1367)
at 
java.base/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:278)
at kafka.utils.ShutdownableThread.pause(ShutdownableThread.scala:82)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:234)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-02-05 05ː27ː09,650] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/kafka12601197135790806695.tmp'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn:1094)
[2021-02-05 05ː27ː09,650] ERROR [ZooKeeperClient] Auth failed. 
(kafka.zookeeper.ZooKeeperClient:74)
[2021-02-05 05ː27ː09,654] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/kafka12601197135790806695.tmp'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn:1094)
[2021-02-05 05ː27ː09,654] ERROR [ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient:74)
[2021-02-05 05ː27ː12,865] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/kafka15543931612251532592.tmp'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn:1094)
[2021-02-05 05ː27ː12,865] ERROR [ZooKeeperClient] Auth failed. 
(kafka.zookeeper.ZooKeeperClient:74)
[2021-02-05 05ː27ː12,868] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/kafka15543931612251532592.tmp'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn:1094)
[2021-02-05 05ː27ː12,869] ERROR [ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient:74)
[2021-02-05 06ː07ː42,597] WARN SASL configuration failed: 
javax.security.auth.login.Log

[jira] [Commented] (KAFKA-12284) Flaky Test MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync

2021-02-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280008#comment-17280008
 ] 

Matthias J. Sax commented on KAFKA-12284:
-

Failed again: 
https://github.com/apache/kafka/pull/1/checks?check_run_id=1836563325

> Flaky Test 
> MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync
> -
>
> Key: KAFKA-12284
> URL: https://issues.apache.org/jira/browse/KAFKA-12284
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1820178470]
> {quote} {{java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:366)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:341)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync(MirrorConnectorsIntegrationBaseTest.java:419)}}
> [...]
>  
> {{Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:364)
>   ... 92 more
> Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.}}
> {quote}
> STDOUT
> {quote} {{2021-02-03 04ː19ː15,975] ERROR [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats:  (org.apache.kafka.connect.runtime.WorkerSourceTask:354)
> org.apache.kafka.common.KafkaException: Producer is closed forcefully.
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282)
>   at java.lang.Thread.run(Thread.java:748)}}{quote}
> {quote} {{[2021-02-03 04ː19ː36,767] ERROR Could not check connector state 
> info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"}
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:466)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:458)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:225)}}
> {{}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r571258754



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+Zk,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */
+ListenerName listener();
+
+/**
+ * The broker connect string which can be used by clients for bootstrapping
+ */
+String brokerList();
+
+/**
+ * A collection of all brokers in the cluster. In ZK-based clusters this 
will also include the broker which is
+ * acting as the controller (since ZK controllers serve both broker and 
controller roles).
+ */
+Collection brokers();
+
+/**
+ * A collection of all controllers in the cluster. For ZK-based clusters, 
this will return the broker which is also
+ * currently the active controller. For Raft-based clusters, this will 
return all controller servers.
+ */
+Collection controllers();
+
+/**
+ * Any one of the broker servers.
+ */
+Optional anyBroker();
+
+/**
+ * Any one of the controller servers.
+ */
+Optional anyController();
+
+/**
+ * The underlying object which is responsible for setting up and tearing 
down the cluster.
+ */
+Object getUnderlying();
+
+default  T getUnderlying(Class asClass) {
+return asClass.cast(getUnderlying());
+}
+
+Admin createAdminClient(Properties configOverrides);
+
+default Admin createAdminClient() {
+return createAdminClient(new Properties());
+}
+
+void start();
+
+void stop();

Review comment:
   Originally I left these out of the interface because the framework 
handle starting and stopping the cluster. However, after @ijuma's feedback, I 
added `boolean autoStart` to ClusterConfig along with these methods so that a 
test could control cluster startup/shutdown explicitly. 
   
   If a test does not call `stop`, the framework will stop the cluster. 
Stopping it a second time does nothing (per the implementation).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10069: MINOR: Add RaftReplicaManager

2021-02-05 Thread GitBox


ijuma commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571257684



##
File path: core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
##
@@ -84,7 +84,7 @@ class DelayedDeleteRecords(delayMs: Long,
 (false, Errors.NOT_LEADER_OR_FOLLOWER, 
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
 }
 
-  case HostedPartition.Deferred(_) =>
+  case HostedPartition.Deferred(_, _, _, _, _) =>

Review comment:
   It's cleaner to use `case _: HostedPartition.Deferred`. Otherwise, every 
time a parameter is added, you have to update this for no good reason.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r571257075



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+Zk,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */

Review comment:
   Yes, multiple listeners can be defined. This is something to solve/fix 
in this new framework. Only one security protocol and listener are exposed here 
in order to stay similar to how KafkaServerTestHarness works. It defines one 
security protocol and listener as members:
   
   ```
   protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
   protected def listenerName: ListenerName = 
ListenerName.forSecurityProtocol(securityProtocol)
   ``` 
   
   Of course, more listeners can be defined through configuration, but this one 
is exposed by the test harness and used by many tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571253465



##
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
+import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+  topic: String, partitionId: Int): Boolean = {
+partitionStates.get(topic).exists { infos =>
+  infos.remove(partitionId)
+  if (infos.isEmpty) partitionStates.remove(topic)
+  true
+}
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+   topic: String,
+   partitionId: Int,
+   stateInfo: UpdateMetadataPartitionState): Unit 
= {
+val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=$brokerId] "
+
+  private val lock = new ReentrantLock()
+
+  //this is the cache state. every MetadataImage instance is immutable, and 
updates (performed under a lock)
+  //replace the value with a completely new one. this means reads (which are 
not under any lock) need to grab
+  //the value of this var (into a val) ONCE and retain that read copy for the 
duration of their operation.
+  //multiple reads of this value risk getting different snapshots.
+  @volatile private var _currentImage: MetadataImage = new MetadataImage()
+
+  private val stateChangeLogger = new StateChangeLogger(brokerId, 
inControllerContext = false, None)
+
+  // This method is the main hotspot when it comes to the performance of 
metadata requests,
+  // we should be careful about adding additional logic here. Relatedly, 
`brokers` is
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
+  // filterUnavailableEndpoints exists to support v0 MetadataResponses
+  private def maybeFilterAliveReplicas(image: MetadataImage,
+   brokers: java.util.List[Integer],
+   listenerName: ListenerName,
+   filterUnavailableEndpoints: Boolean): 
java.util.List[Integer] = {
+if (!filterUnavailableEndpoints) {
+  brokers
+} else {
+  val res = new 
util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, 
brokers.size))
+  for (brokerId <- brokers.asScala) {
+if (hasAliveEndpoint(image, brokerId, listenerName))
+  res.add(brokerId)
+  }
+  res
+}
+  }
+
+  def currentImage(): MetadataImage = _currentImage
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener 
is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing 
listener (Metadata response v5 and below).
+  private def getPartitionMetadata(image: MetadataImage, topic: St

[jira] [Commented] (KAFKA-9527) Application Reset Tool Returns NPE when --to-datetime or --by-duration are run on --input-topics with empty partitions

2021-02-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1727#comment-1727
 ] 

Matthias J. Sax commented on KAFKA-9527:


Thanks everyone. I'll try to review the PR soon – but we have 2.8.0 deadline 
and we want to do a 2.6.2 release that I need to take care of first. – We can 
discuss more details on the PR. And yes, we have flaky tests so don't worry 
about it.

> Application Reset Tool Returns NPE when --to-datetime or --by-duration are 
> run on --input-topics with empty partitions 
> ---
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: Marco Lotz
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-datetime if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-datetime and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r571246502



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+Zk,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */
+ListenerName listener();
+
+/**
+ * The broker connect string which can be used by clients for bootstrapping
+ */
+String brokerList();
+
+/**
+ * A collection of all brokers in the cluster. In ZK-based clusters this 
will also include the broker which is
+ * acting as the controller (since ZK controllers serve both broker and 
controller roles).
+ */
+Collection brokers();
+
+/**
+ * A collection of all controllers in the cluster. For ZK-based clusters, 
this will return the broker which is also
+ * currently the active controller. For Raft-based clusters, this will 
return all controller servers.
+ */
+Collection controllers();
+
+/**
+ * Any one of the broker servers.
+ */
+Optional anyBroker();
+
+/**
+ * Any one of the controller servers.
+ */
+Optional anyController();
+
+/**
+ * The underlying object which is responsible for setting up and tearing 
down the cluster.
+ */
+Object getUnderlying();
+
+default  T getUnderlying(Class asClass) {
+return asClass.cast(getUnderlying());
+}
+
+Admin createAdminClient(Properties configOverrides);
+
+default Admin createAdminClient() {
+return createAdminClient(new Properties());
+}
+
+void start();
+
+void stop();

Review comment:
   should this implement `AutoCloseable`?  Does code need to remember to 
close this cluster?
   
   Or is that handled automatically by the framework...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums

2021-02-05 Thread GitBox


hachikuji merged pull request #10045:
URL: https://github.com/apache/kafka/pull/10045


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r571245971



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+Zk,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */
+ListenerName listener();
+
+/**
+ * The broker connect string which can be used by clients for bootstrapping
+ */
+String brokerList();
+
+/**
+ * A collection of all brokers in the cluster. In ZK-based clusters this 
will also include the broker which is
+ * acting as the controller (since ZK controllers serve both broker and 
controller roles).
+ */
+Collection brokers();

Review comment:
   I would prefer to call this method something like `brokerSocketServers`. 
 After all, the `SocketServer` is just one small part of the broker, not the 
broker itself.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12301) Support for enum validation in configuration

2021-02-05 Thread Jeremy Custenborder (Jira)
Jeremy Custenborder created KAFKA-12301:
---

 Summary: Support for enum validation in configuration  
 Key: KAFKA-12301
 URL: https://issues.apache.org/jira/browse/KAFKA-12301
 Project: Kafka
  Issue Type: Improvement
  Components: config
Reporter: Jeremy Custenborder
Assignee: Jeremy Custenborder


Several configuration elements are mapped to internal enums. A typo in 
configuration will yield error messages that are not descriptive and require 
the user to find valid values. 

For example:
{code:java}
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to 
create new KafkaAdminClient
at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:479)
at org.apache.kafka.clients.admin.Admin.create(Admin.java:61)
at 
org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
...
Caused by: java.lang.IllegalArgumentException: No enum constant 
org.apache.kafka.common.security.auth.SecurityProtocol.SASL_PLAINTEXTA
at java.lang.Enum.valueOf(Enum.java:238)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
at 
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:103)
at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:454)
... 7 more {code}
This is easier to troubleshoot.
{code:java}
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to 
create new KafkaAdminClient
at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:479)
at org.apache.kafka.clients.admin.Admin.create(Admin.java:61)
at 
org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
...
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
SASL_PLAINTEXTA for security.protocol. Enum value not found. Valid values are: 
PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL
at java.lang.Enum.valueOf(Enum.java:238)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
at 
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:103)
at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:454)
... 7 more {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r571245403



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {
+Zk,
+// Raft
+}
+
+/**
+ * Cluster type. For now, only ZK is supported.
+ */
+ClusterType clusterType();
+
+/**
+ * The cluster configuration used to create this cluster. Changing data in 
this instance through this accessor will
+ * have no affect on the cluster since it is already provisioned.
+ */
+ClusterConfig config();
+
+/**
+ * The listener for this cluster as configured by {@link ClusterTest} or 
by {@link ClusterConfig}. If
+ * unspecified by those sources, this will return the listener for the 
default security protocol PLAINTEXT
+ */

Review comment:
   A cluster can have multiple listeners, right?  Is this intended to be 
the listener that should be used by clients?  If so, calling it the client 
listener makes sense.
   
   (Technically we could have multiple client listeners too, but that's much 
less common)
   
   Many clusters run with 3 listeners:
   * client
   * inter-broker NON-REPLICATION
   * replication
   
   and we are going to add a 4th:
   * controller





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #10071: KAFKA-12298: Create LeaderAndIsrRequestBenchmark

2021-02-05 Thread GitBox


jolshan opened a new pull request #10071:
URL: https://github.com/apache/kafka/pull/10071


   I thought it would be useful to check how the changes to 
LeaderAndIsrRequests (for KIP-516) affect the handling of the request. This 
benchmark builds a LeaderAndIsrRequest and calls  
`kafkaApis.handleLeaderAndIsrRequest`. Other benchmarks for this type of 
request could be added here as well.
   
   I also slightly changed the MetadataRequestBenchmark since I noticed that 
the MetadataCache used in that benchmark did not add topic IDs.
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r571244427



##
File path: core/src/test/java/kafka/test/ClusterInstance.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+enum ClusterType {

Review comment:
   Enum values should be capitalized





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r571242927



##
File path: core/src/test/java/kafka/test/ClusterConfig.java
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Represents a requested configuration of a Kafka cluster for integration 
testing
+ */
+public class ClusterConfig {
+
+private final Type type;
+private final int brokers;
+private final int controllers;
+private final String name;
+private final boolean autoStart;
+
+private final String securityProtocol;

Review comment:
   What's the advantage of making this a string rather than using the 
SecurityProtocol enum?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r571241978



##
File path: core/src/test/java/kafka/test/annotation/Type.java
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+public enum Type {

Review comment:
   Java Enums should be capitalized.  Also can we add a JavaDoc for each 
value describing what it is?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (KAFKA-10716) Streams processId is unstable across restarts resulting in task mass migration

2021-02-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman closed KAFKA-10716.
--

> Streams processId is unstable across restarts resulting in task mass migration
> --
>
> Key: KAFKA-10716
> URL: https://issues.apache.org/jira/browse/KAFKA-10716
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Critical
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> The new high availability feature of KIP-441 relies on deterministic 
> assignment to produce an eventually-stable assignment. The 
> HighAvailabilityTaskAssignor assigns tasks based on the unique processId 
> assigned to each client, so if the same set of Kafka Streams applications 
> participate in a rebalance it should generate the same task assignment every 
> time.
> Unfortunately the processIds aren't stable across restarts. We generate a 
> random UUID in the KafkaStreams constructor, so each time the process starts 
> up it would be assigned a completely different processId. Unless this new 
> processId happens to be in exactly the same order as the previous one, a 
> single bounce or crash/restart can result in a large scale shuffling of tasks 
> based on a completely different eventual assignment.
> Ultimately we should fix this via KAFKA-10121, but that's a nontrivial 
> undertaking and this bug merits some immediate relief if we don't intend to 
> tackle the larger problem in the upcoming releases 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10060: KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7

2021-02-05 Thread GitBox


ableegoldman commented on pull request #10060:
URL: https://github.com/apache/kafka/pull/10060#issuecomment-774266721


   Merged to 2.7 and cherrypicked to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571208259



##
File path: core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala
##
@@ -37,7 +36,15 @@ object MetadataBroker {
 endPoint.name() ->
   new Node(record.brokerId, endPoint.host, endPoint.port, record.rack)
   }.toMap,
-  true)
+  fenced = true)
+  }
+
+  def apply(broker: Broker): MetadataBroker = {
+new MetadataBroker(broker.id, broker.rack.orNull,
+  broker.endPoints.map { endpoint =>
+endpoint.listenerName.value -> new Node(broker.id, endpoint.host, 
endpoint.port, broker.rack.orNull)
+  }.toMap,
+  fenced = false)

Review comment:
   as long as this is just used by the ZK code path there is no harm





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12300) Log source broker when handling metadata response

2021-02-05 Thread alex wang (Jira)
alex wang created KAFKA-12300:
-

 Summary: Log source broker when handling metadata response
 Key: KAFKA-12300
 URL: https://issues.apache.org/jira/browse/KAFKA-12300
 Project: Kafka
  Issue Type: Improvement
Reporter: alex wang


Sometimes it's helpful to log broker id when an error occurs during metadata 
request, this can yield insight into the possible cause of the metadata request 
error, such as a metadata propagation delay.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9527) Application Reset Tool Returns NPE when --to-datetime or --by-duration are run on --input-topics with empty partitions

2021-02-05 Thread jbfletch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279941#comment-17279941
 ] 

jbfletch commented on KAFKA-9527:
-

Thanks for taking this on [~marcolotz]

> Application Reset Tool Returns NPE when --to-datetime or --by-duration are 
> run on --input-topics with empty partitions 
> ---
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: Marco Lotz
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-datetime if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-datetime and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman merged pull request #10060: KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7

2021-02-05 Thread GitBox


ableegoldman merged pull request #10060:
URL: https://github.com/apache/kafka/pull/10060


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12279) Kafka 2.7 stream app issue

2021-02-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12279.

Resolution: Not A Bug

> Kafka 2.7 stream app issue
> --
>
> Key: KAFKA-12279
> URL: https://issues.apache.org/jira/browse/KAFKA-12279
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: prabhu biradar
>Priority: Critical
>
> After starting the stream application below exception is thrown and threads 
> stop processing. 
>   2021-02-02T22:29:10.416::AccountId ::Partition ::Offset 
> ::[com.xx.xx.xx.xx.xx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread
>  [com.xx.xx.xx.xx.xx-client-StreamThread-1] Encountered the following 
> exception during processing and the thread is going to shut down:   
> 2021-02-02T22:29:10.416::AccountId ::Partition ::Offset 
> ::[com.xx.xx.xx.xx.xx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread
>  [com.xx.xx.xx.xx.xx-client-StreamThread-1] Encountered the following 
> exception during processing and the thread is going to shut down: 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 0_13 at 
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318)
>  at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) 
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) 
> at java.util.TreeMap.put(TreeMap.java:552) at 
> java.util.TreeSet.add(TreeSet.java:255) at 
> java.util.AbstractCollection.addAll(AbstractCollection.java:344) at 
> java.util.TreeSet.addAll(TreeSet.java:312) at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1299)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1213)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:939)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:393)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:111)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1164)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1139)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1292)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:801)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:763)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:614)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)2021-02-02T22:29:10.416::AccountId
>  ::Partition ::Offset 
> ::[com.xx.xx.xx.xx.xxx-client-StreamThread-

[jira] [Commented] (KAFKA-12279) Kafka 2.7 stream app issue

2021-02-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279938#comment-17279938
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12279:


Root cause: the source topic was deleted and recreated with a smaller number of 
partitions. However, the app was not reset and the local state was not cleared, 
which left the task directory for 0_13 on disk. Since the new input topics had 
fewer than 13 partitions, this task was no longer recognized.

Since decreasing the partition number on-the-fly without resetting the 
application is not supported, I'm going to close this as "Not a Bug"

> Kafka 2.7 stream app issue
> --
>
> Key: KAFKA-12279
> URL: https://issues.apache.org/jira/browse/KAFKA-12279
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: prabhu biradar
>Priority: Critical
>
> After starting the stream application below exception is thrown and threads 
> stop processing. 
>   2021-02-02T22:29:10.416::AccountId ::Partition ::Offset 
> ::[com.xx.xx.xx.xx.xx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread
>  [com.xx.xx.xx.xx.xx-client-StreamThread-1] Encountered the following 
> exception during processing and the thread is going to shut down:   
> 2021-02-02T22:29:10.416::AccountId ::Partition ::Offset 
> ::[com.xx.xx.xx.xx.xx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread
>  [com.xx.xx.xx.xx.xx-client-StreamThread-1] Encountered the following 
> exception during processing and the thread is going to shut down: 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 0_13 at 
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318)
>  at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) 
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) 
> at java.util.TreeMap.put(TreeMap.java:552) at 
> java.util.TreeSet.add(TreeSet.java:255) at 
> java.util.AbstractCollection.addAll(AbstractCollection.java:344) at 
> java.util.TreeSet.addAll(TreeSet.java:312) at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1299)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1213)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:939)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:393)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:111)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1164)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1139)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1292)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:801)
>  at 
> org.apach

[GitHub] [kafka] rondagostino commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10045:
URL: https://github.com/apache/kafka/pull/10045#discussion_r571174825



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3511,4 +3501,14 @@ object KafkaApis {
 FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet
   .iterator.asScala.filter(element => 
quota.isThrottled(element.getKey)).asJava)
   }
+
+  // visible for testing
+  private[server] def shouldNeverReceive(request: RequestChannel.Request): 
Exception = {

Review comment:
   Ah, I hadn't tested for those, and I moved based on rewriting tests.  
Will move.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums

2021-02-05 Thread GitBox


hachikuji commented on a change in pull request #10045:
URL: https://github.com/apache/kafka/pull/10045#discussion_r571174163



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3511,4 +3501,14 @@ object KafkaApis {
 FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet
   .iterator.asScala.filter(element => 
quota.isThrottled(element.getKey)).asJava)
   }
+
+  // visible for testing
+  private[server] def shouldNeverReceive(request: RequestChannel.Request): 
Exception = {

Review comment:
   Why move these and not `unsupported`/`notYetSupported`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571173703



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
   KAFKA-12299





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12299) Follow ups from MetadataCache changes

2021-02-05 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-12299:
-
Description: 
When merging the Raft MetadataCache changes for 2.8, we identified a few follow 
up items during the code review. 

See: 
* https://github.com/apache/kafka/pull/10049
* https://github.com/apache/kafka/pull/10018

  was:
When merging the Raft MetadataCache changes for 2.8, we identified a few follow 
up items during the code review. 

See: https://github.com/apache/kafka/pull/10049


> Follow ups from MetadataCache changes
> -
>
> Key: KAFKA-12299
> URL: https://issues.apache.org/jira/browse/KAFKA-12299
> Project: Kafka
>  Issue Type: Task
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Minor
>
> When merging the Raft MetadataCache changes for 2.8, we identified a few 
> follow up items during the code review. 
> See: 
> * https://github.com/apache/kafka/pull/10049
> * https://github.com/apache/kafka/pull/10018



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571173506



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]

Review comment:
   KAFKA-12299





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12299) Follow ups from MetadataCache changes

2021-02-05 Thread David Arthur (Jira)
David Arthur created KAFKA-12299:


 Summary: Follow ups from MetadataCache changes
 Key: KAFKA-12299
 URL: https://issues.apache.org/jira/browse/KAFKA-12299
 Project: Kafka
  Issue Type: Task
Reporter: David Arthur
Assignee: David Arthur


When merging the Raft MetadataCache changes for 2.8, we identified a few follow 
up items during the code review. 

See: https://github.com/apache/kafka/pull/10049



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe opened a new pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-05 Thread GitBox


cmccabe opened a new pull request #10070:
URL: https://github.com/apache/kafka/pull/10070


   The quorum controller stores metadata in the KIP-500 metadata log, not in 
Apache ZooKeeper.  Each controller node is a voter in the metadata quorum.  The 
leader of the quorum is the active controller, which processes write requests.  
The followers are standby controllers, which replay the operations written to 
the log.  If the active controller goes away, a standby controller can take its 
place.
   
   Like the ZooKeeper-based controller, the quorum controller is based on an 
event queue backed by a single-threaded executor.  However, unlike the ZK-based 
controller, the quorum controller can have multiple operations in flight-- it 
does not need to wait for one operation to be finished before starting another. 
 Therefore, calls into the QuorumController return CompleteableFuture objects 
which are completed with either a result or an error when the operation is 
done.  The QuorumController will also time out operations that have been 
sitting on the queue too long without being processed.  In this case, the 
future is completed with a TimeoutException.
   
   The controller uses timeline data structures to store multiple "versions" of 
its in-memory state simultaneously.  "Read operations" read the latest 
committed state -- essentially, they are reading from a slightly older 
in-memory snapshot of the state.  "Write operations" read and write the latest 
state.  However, we can not return a successful result for a write operation 
until its state has been committed to the log.  Therefore, if a client receives 
an RPC response, it knows that the requested operation has been performed, and 
can not be undone by a controller failover.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571169638



##
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
+import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+  topic: String, partitionId: Int): Boolean = {
+partitionStates.get(topic).exists { infos =>
+  infos.remove(partitionId)
+  if (infos.isEmpty) partitionStates.remove(topic)
+  true
+}
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+   topic: String,
+   partitionId: Int,
+   stateInfo: UpdateMetadataPartitionState): Unit 
= {
+val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=${brokerId}] "
+
+  private val lock = new ReentrantLock()
+
+  //this is the cache state. every MetadataImage instance is immutable, and 
updates (performed under a lock)
+  //replace the value with a completely new one. this means reads (which are 
not under any lock) need to grab
+  //the value of this var (into a val) ONCE and retain that read copy for the 
duration of their operation.
+  //multiple reads of this value risk getting different snapshots.
+  @volatile private var _currentImage: MetadataImage = new MetadataImage()
+
+  private val stateChangeLogger = new StateChangeLogger(brokerId, 
inControllerContext = false, None)
+
+  // This method is the main hotspot when it comes to the performance of 
metadata requests,
+  // we should be careful about adding additional logic here. Relatedly, 
`brokers` is
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
+  // filterUnavailableEndpoints exists to support v0 MetadataResponses
+  private def maybeFilterAliveReplicas(image: MetadataImage,
+   brokers: java.util.List[Integer],
+   listenerName: ListenerName,
+   filterUnavailableEndpoints: Boolean): 
java.util.List[Integer] = {
+if (!filterUnavailableEndpoints) {
+  brokers
+} else {
+  val res = new 
util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, 
brokers.size))
+  for (brokerId <- brokers.asScala) {
+if (hasAliveEndpoint(image, brokerId, listenerName))
+  res.add(brokerId)
+  }
+  res
+}
+  }
+
+  def currentImage(): MetadataImage = _currentImage
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener 
is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing 
listener (Metadata response v5 and below).
+  private def getPartitionMetadata(image: MetadataImage, topi

[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571163776



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -485,7 +494,7 @@ class MetadataCacheTest {
 
   @Test

Review comment:
   This test is failing in Raft mode. I'll investigate





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571163226



##
File path: core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala
##
@@ -37,7 +36,15 @@ object MetadataBroker {
 endPoint.name() ->
   new Node(record.brokerId, endPoint.host, endPoint.port, record.rack)
   }.toMap,
-  true)
+  fenced = true)
+  }
+
+  def apply(broker: Broker): MetadataBroker = {
+new MetadataBroker(broker.id, broker.rack.orNull,
+  broker.endPoints.map { endpoint =>
+endpoint.listenerName.value -> new Node(broker.id, endpoint.host, 
endpoint.port, broker.rack.orNull)
+  }.toMap,
+  fenced = false)

Review comment:
   I wanted to go ahead and conform to MetadataBroker for both 
implementations. One side-effect is we are exposing the fenced flag to ZK-based 
clusters. I've set it to false here since we don't have any notion of broker 
fencing in the ZK-based metadata. Is there any harm in this? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571162641



##
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
+import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+  topic: String, partitionId: Int): Boolean = {
+partitionStates.get(topic).exists { infos =>
+  infos.remove(partitionId)
+  if (infos.isEmpty) partitionStates.remove(topic)
+  true
+}
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+   topic: String,
+   partitionId: Int,
+   stateInfo: UpdateMetadataPartitionState): Unit 
= {
+val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=${brokerId}] "
+
+  private val lock = new ReentrantLock()
+
+  //this is the cache state. every MetadataImage instance is immutable, and 
updates (performed under a lock)
+  //replace the value with a completely new one. this means reads (which are 
not under any lock) need to grab
+  //the value of this var (into a val) ONCE and retain that read copy for the 
duration of their operation.
+  //multiple reads of this value risk getting different snapshots.
+  @volatile private var _currentImage: MetadataImage = new MetadataImage()
+
+  private val stateChangeLogger = new StateChangeLogger(brokerId, 
inControllerContext = false, None)
+
+  // This method is the main hotspot when it comes to the performance of 
metadata requests,
+  // we should be careful about adding additional logic here. Relatedly, 
`brokers` is
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
+  // filterUnavailableEndpoints exists to support v0 MetadataResponses
+  private def maybeFilterAliveReplicas(image: MetadataImage,
+   brokers: java.util.List[Integer],
+   listenerName: ListenerName,
+   filterUnavailableEndpoints: Boolean): 
java.util.List[Integer] = {
+if (!filterUnavailableEndpoints) {
+  brokers
+} else {
+  val res = new 
util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, 
brokers.size))
+  for (brokerId <- brokers.asScala) {
+if (hasAliveEndpoint(image, brokerId, listenerName))
+  res.add(brokerId)
+  }
+  res
+}
+  }
+
+  def currentImage(): MetadataImage = _currentImage
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener 
is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing 
listener (Metadata response v5 and below).
+  private def getPartitionMetadata(image: MetadataImage, topi

[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571151499



##
File path: core/src/main/scala/kafka/server/metadata/MetadataImage.scala
##
@@ -118,5 +118,10 @@ case class MetadataImage(partitions: MetadataPartitions,
   def topicIdToName(uuid: Uuid): Option[String] = {
 partitions.topicIdToName(uuid)
   }
+
+  def topicNameToId(name: String): Option[Uuid] = {
+

Review comment:
   nit: remove newline

##
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
+import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+  topic: String, partitionId: Int): Boolean = {
+partitionStates.get(topic).exists { infos =>
+  infos.remove(partitionId)
+  if (infos.isEmpty) partitionStates.remove(topic)
+  true
+}
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+   topic: String,
+   partitionId: Int,
+   stateInfo: UpdateMetadataPartitionState): Unit 
= {
+val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=${brokerId}] "

Review comment:
   nit: unneeded braces

##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]

Review comment:
   nit: We can save it for a follow-up, but it would be nice to drop all 
the `get` prefixes here

##
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. 

[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571161027



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -16,20 +16,20 @@
   */
 package kafka.server
 
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+
 import java.util
-import java.util.Collections
 import util.Arrays.asList
-
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 class MetadataCacheTest {

Review comment:
   Yea easy enough. I added `@ParameterizedTest` to MetadataCacheTest. One 
is failing with the Raft metadata cache, so I left that for ZK-only now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571149211



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {

Review comment:
   Ok. Probably not a ton of work either way, but will leave it up to you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571148172



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -16,20 +16,20 @@
   */
 package kafka.server
 
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+
 import java.util
-import java.util.Collections
 import util.Arrays.asList
-
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 class MetadataCacheTest {

Review comment:
   Should be easy to do with all the fancy junit 5 stuff. Since there are 
no separate tests for `RaftMetadataCache` (unless I'm missing some), I think we 
should do it here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571145320



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {

Review comment:
   I was thinking it might be easier to refactor in the future if we only 
need to rename the factory method rather than changing all the `new Class`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571142951



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
   Perhaps we can do this as a follow-up? It is nice at the moment to see 
the diffs clearly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571140218



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]

Review comment:
   Yea, this just got pulled up from the class when I extracted the trait. 
I'll fix up these comments





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571139784



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
   I left the ZK implementation in place since it's really the only 
production implementation for now. It also reduces the size of the diff for 
this change. I don't feel very strongly about it either way, so I'm happy to 
relocate it to a separate file





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571139392



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {

Review comment:
   nit: I don't think these factories are providing much over the 
constructors `MetdataCache.zkMetadataCache` vs `new ZkMetadataCache`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571138962



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -16,20 +16,20 @@
   */
 package kafka.server
 
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+
 import java.util
-import java.util.Collections
 import util.Arrays.asList
-
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 class MetadataCacheTest {

Review comment:
   Yea, I think we can probably do some parameterization thing or possibly 
even use test templating similar to #9986





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on a change in pull request #10002: MINOR: remove the indent in security doc

2021-02-05 Thread GitBox


omkreddy commented on a change in pull request #10002:
URL: https://github.com/apache/kafka/pull/10002#discussion_r571127207



##
File path: docs/security.html
##
@@ -649,22 +649,22 @@ Configuring Kafka Brokers
 
 Add a suitably modified JAAS file similar to the one below to 
each Kafka broker's config directory, let's call it kafka_server_jaas.conf for 
this example:
-
KafkaServer {
-org.apache.kafka.common.security.plain.PlainLoginModule required
-username="admin"
-password="admin-secret"
-user_admin="admin-secret"
-user_alice="alice-secret";
-};
+KafkaServer {
+org.apache.kafka.common.security.plain.PlainLoginModule required
+username="admin"
+password="admin-secret"
+user_admin="admin-secret"
+user_alice="alice-secret";
+};
 This configuration defines two users (admin and 
alice). The properties username and password
 in the KafkaServer section are used by the broker to 
initiate connections to other brokers. In this example,
 admin is the user for inter-broker communication. The 
set of properties user_userName defines
 the passwords for all users that connect to the broker and the 
broker validates all client connections including
 those from other brokers using these properties.
 Pass the JAAS config file location as JVM parameter to each 
Kafka broker:
-
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
 Configure SASL port and SASL mechanisms in server.properties 
as described here. For example:
-listeners=SASL_SSL://host.name:port
+listeners=SASL_SSL://host.name:port

Review comment:
   @showuon Thanks for the updates. Can you update below lines also?  
thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums

2021-02-05 Thread GitBox


rondagostino commented on a change in pull request #10045:
URL: https://github.com/apache/kafka/pull/10045#discussion_r571124965



##
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import kafka.network.RequestChannel
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.requests.AbstractResponse
+
+sealed trait MetadataSupport {
+  /**
+   * Provide a uniform way of getting to the ForwardingManager, which is a 
shared concept
+   * despite being optional when using ZooKeeper and required when using Raft
+   */
+  val forwardingManager: Option[ForwardingManager]
+
+  /**
+   * Return this instance downcast for use with ZooKeeper
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with ZooKeeper
+   * @throws Exception if this instance is not for ZooKeeper
+   */
+  def requireZkOrThrow(createException: => Exception): ZkSupport

Review comment:
   Yeah, I prefer it here only because this has worked out so well that I 
would not be surprised if we tried to use the same approach somewhere else.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9986: JUnit extensions for integration tests

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #9986:
URL: https://github.com/apache/kafka/pull/9986#discussion_r571122210



##
File path: 
core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
##
@@ -16,83 +16,94 @@
   */
 package kafka.server
 
+import integration.kafka.server.IntegrationTestHelper
+
 import java.net.Socket
 import java.util.Collections
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.{ClusterConfig, ClusterInstance}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.extension.ExtendWith
 
-class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with 
SaslSetup {
-  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-  private val kafkaClientSaslMechanism = "PLAIN"
-  private val kafkaServerSaslMechanisms = List("PLAIN")
-  protected override val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
-  protected override val clientSaslProperties = 
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-  override def brokerCount = 1
+import scala.jdk.CollectionConverters._
 
-  @BeforeEach
-  override def setUp(): Unit = {
-startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), KafkaSasl, 
JaasTestUtils.KafkaServerContextName))
-super.setUp()
-  }
 
-  @AfterEach
-  override def tearDown(): Unit = {
-super.tearDown()
-closeSasl()
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class SaslApiVersionsRequestTest(helper: IntegrationTestHelper,
+ cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(helper, cluster) {
+
+  val kafkaClientSaslMechanism = "PLAIN"
+  val kafkaServerSaslMechanisms = List("PLAIN")
+
+  private var sasl: SaslSetup = _
+
+  @BeforeEach
+  def setupSasl(config: ClusterConfig): Unit = {
+sasl = new SaslSetup() {}
+sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), KafkaSasl, 
JaasTestUtils.KafkaServerContextName))
+
config.saslServerProperties().putAll(sasl.kafkaServerSaslProperties(kafkaServerSaslMechanisms,
 kafkaClientSaslMechanism))
+
config.saslClientProperties().putAll(sasl.kafkaClientSaslProperties(kafkaClientSaslMechanism))
+super.brokerPropertyOverrides(config.serverProperties())
   }
 
-  @Test
+  @ClusterTest(securityProtocol = "SASL_PLAINTEXT", clusterType = Type.Zk)
   def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
-val socket = connect()
+val socket = helper.connect(cluster.brokers().asScala.head, 
cluster.listener())
 try {
-  val apiVersionsResponse = sendAndReceive[ApiVersionsResponse](
+  val apiVersionsResponse = helper.sendAndReceive[ApiVersionsResponse](
 new ApiVersionsRequest.Builder().build(0), socket)
-  validateApiVersionsResponse(apiVersionsResponse)
+  validateApiVersionsResponse(apiVersionsResponse, cluster.listener())
   sendSaslHandshakeRequestValidateResponse(socket)
 } finally {
   socket.close()
 }
   }
 
-  @Test
+  @ClusterTest(securityProtocol = "SASL_PLAINTEXT", clusterType = Type.Zk)

Review comment:
   We could do it in a method specified by `@ClusterTemplate`, or we could 
add `securityProtocol ` to the `@ClusterTestDefaults` class-level annotation. 
I'll leave this for a follow-up PR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571119868



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -16,20 +16,20 @@
   */
 package kafka.server
 
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+
 import java.util
-import java.util.Collections
 import util.Arrays.asList
-
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 class MetadataCacheTest {

Review comment:
   we should have some way of running these tests on the raft metadata 
cache as well as the zk metadata cache.  I guess we can do that in a follow-up 
PR, though





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571118214



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
   This is kind of a lot of code so why not have it in its own file?
   
   The JavaDoc should also explain that this is for brokers using ZK and not 
for brokers using the metadata quorum.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571116863



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]

Review comment:
   can you put this in javadoc format ?  i.e. `@returns`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


cmccabe commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571116386



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(

Review comment:
   The formatting is kind of weird here.  Why the blank line after 
`getTopicMetadata(`?
   
   Also, it would be good to have Javadoc for this function





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10008: MINOR: Remove ZK dependency for coordinator topics' partition counts

2021-02-05 Thread GitBox


cmccabe merged pull request #10008:
URL: https://github.com/apache/kafka/pull/10008


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #10050: MINOR: Don't assume number of dns results (stabilize ClusterConnectio…

2021-02-05 Thread GitBox


chia7712 closed pull request #10050:
URL: https://github.com/apache/kafka/pull/10050


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10050: MINOR: Don't assume number of dns results (stabilize ClusterConnectio…

2021-02-05 Thread GitBox


chia7712 commented on pull request #10050:
URL: https://github.com/apache/kafka/pull/10050#issuecomment-774149259


   @mumrah Thanks for kind reminder. close this now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12298) Create LeaderAndIsrRequestBenchmark

2021-02-05 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-12298:
--

 Summary: Create LeaderAndIsrRequestBenchmark
 Key: KAFKA-12298
 URL: https://issues.apache.org/jira/browse/KAFKA-12298
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan
Assignee: Justine Olshan


Since KIP-516 is making some changes to LeaderAndIsrRequests, I thought it 
would be useful to have a benchmark to compare implementations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino opened a new pull request #10069: MINOR: Add RaftReplicaManager

2021-02-05 Thread GitBox


rondagostino opened a new pull request #10069:
URL: https://github.com/apache/kafka/pull/10069


   This adds the logic to apply partition metadata when consuming from the 
Raft-based metadata log.
   
   `RaftReplicaManager` extends `ReplicaManager` for now to minimize changes to 
existing code for the 2.8 release.  We will likely adjust this hierarchy at a 
later time (e.g. introducing a trait and adding a helper to refactor common 
code).  For now, we expose the necessary fields and methods in `ReplicaManager` 
by changing their scope from `private` to `protected`, and we refactor out a 
couple of pieces of logic that are shared between the two implementation 
(stopping replicas and adding log dir fetchers).
   
   Existing tests are sufficient to expose regressions in the current 
`ReplicaManager`.
   
   We intend to exercise the new `RaftReplicaManager` code via system tests and 
unit/integration tests (both to come in later PRs).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #10068: MINOR: StopReplicaResp and StopReplicaReq Test should cover all available version

2021-02-05 Thread GitBox


dajac commented on pull request #10068:
URL: https://github.com/apache/kafka/pull/10068#issuecomment-774140916


   I've re-triggered Jenkins. I will merge once we have a clean run.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10008: MINOR: Remove ZK dependency for coordinator topics' partition counts

2021-02-05 Thread GitBox


cmccabe commented on pull request #10008:
URL: https://github.com/apache/kafka/pull/10008#issuecomment-774136720


   Failing tests are 
   
`org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining`
   and 
`org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()`
 which are not related.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12193) Re-resolve IPs when a client is disconnected

2021-02-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-12193:

Fix Version/s: 2.6.2

> Re-resolve IPs when a client is disconnected
> 
>
> Key: KAFKA-12193
> URL: https://issues.apache.org/jira/browse/KAFKA-12193
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: Bob Barrett
>Assignee: Bob Barrett
>Priority: Major
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> If `client.dns.lookup` is set to `use_all_dns_ips` or 
> `resolve_canonical_bootstrap_servers_only`, the NetworkClient can store 
> multiple IPs for each node, and currently it tries each IP in the list when 
> connecting before re-resolving the IPs. This is useful when first 
> establishing a connection because it ensures that the client exhausts all 
> possible IPs. However, in the case where the IPs changed after a connection 
> was already established, this would cause a reconnecting client to try 
> several invalid IPs before re-resolving and trying a valid one. Instead, we 
> should re-resolve DNS when a client disconnects from an established 
> connection, rather than assuming the all previously-resolved IPs are still 
> valid.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #10061: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-02-05 Thread GitBox


dajac commented on pull request #10061:
URL: https://github.com/apache/kafka/pull/10061#issuecomment-774097673


   @ableegoldman FYI - We have backported this fix to 2.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #10061: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-02-05 Thread GitBox


dajac merged pull request #10061:
URL: https://github.com/apache/kafka/pull/10061


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12274) Transactional operation fails when broker is replaced using the same broker ID.

2021-02-05 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-12274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michał Łukowicz updated KAFKA-12274:

Description: 
Hello Team!

One of our clusters is being used to:
 * process transactional writes
 * had ack set to all

We are using java client and followed all recommendation regarding avoiding 
dead fencing issues, etc.

We spotted the problem during upgrading kafka hosts to stronger machines:
 * stop old broker
 * start a new clean broker node (a different hostname) reusing the same 
broker.id 

During the operation we found that while kafka is normally replicating 
partitions to recover after very short period of time (1 - 3 mins) we start to 
see error on kafka broker:
{code:java}
broker=13] Error processing append operation on partition 
 org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producerId 51119 at offset 16878080903 in partition 
: 2930356 (incoming seq. number), 2930213 (current end sequence 
number){code}
And we are starting to see records buffered on the Producer side, and 
eventually, the producer send requests failed with::
{code:java}
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 
record(s) for :120892 ms has passed since batch creation{code}
The only additional thing we observed is that for some reason couple of 
paritions ISR had been reduced to 1 and then back to 3 when broker finished up 
replication.

The same situation can be observed when adding new brokers to cluster and 
performing rebalacing (using kafka cruise control) and setting concurrent 
partition and leader movements to higher value.

This does not happen if broker is just stopped - even for longer period of time 
or restarted - this only happens during host replace.

Can you please let me know if this is a bug ... or we are doing something wrong?

Kafka 2.6.0

min.insync.replica for topics is set to 1 (tested with set to 2 - no change)

replication.factor is 3

all transaction settings are currently default.

 

  was:
Hello Team!

One of our clusters is being used to:
 * process transactional writes
 * had ack set to all

We are using java client and followed all recommendation regarding avoiding 
dead fencing issues, etc.

We spotted the problem during upgrading kafka hosts to stronger machines:
 * stop old broker
 * start a new clean broker node (a different hostname) reusing the same 
broker.id 

During the operation we found that while kafka is normally replicating 
partitions to recover after very short period of time (1 - 3 mins) we start to 
see error on kafka broker:
{code:java}
broker=13] Error processing append operation on partition 
 org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producerId 51119 at offset 16878080903 in partition 
: 2930356 (incoming seq. number), 2930213 (current end sequence 
number){code}
And we are starting to see records buffered on the Producer side, and 
eventually, the producer send requests failed with::
{code:java}
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 
record(s) for :120892 ms has passed since batch creation{code}
The only additional thing we observed is that for some reason couple of 
paritions ISR had been reduced to 1 and then back to 3 when broker finished up 
replication.

The same situation can be observed when adding new brokers to cluster and 
performing rebalacing (using kafka cruise control) and setting concurrent 
partition and leader movements to higher value.

This does not happen if broker is just stopped - even for longer period of time 
or restarted - this only happens during host replace.

Can you please let me know if this is a bug ... or we are doing something wrong?

Kafka 2.6.0

min.insync.replica for topics is set to 1

replication.factor is 3

all transaction settings are currently default.

 


> Transactional operation fails when broker is replaced using the same broker 
> ID.
> ---
>
> Key: KAFKA-12274
> URL: https://issues.apache.org/jira/browse/KAFKA-12274
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, producer 
>Affects Versions: 2.6.0
>Reporter: Michał Łukowicz
>Priority: Critical
>
> Hello Team!
> One of our clusters is being used to:
>  * process transactional writes
>  * had ack set to all
> We are using java client and followed all recommendation regarding avoiding 
> dead fencing issues, etc.
> We spotted the problem during upgrading kafka hosts to stronger machines:
>  * stop old broker
>  * start a new clean broker node (a different hostname) reusing the same 
> broker.id 
> During the operation we found that while kafka is normally replicating 
> partitions to recover after very short period of time (1 - 3 mins

[GitHub] [kafka] mumrah commented on pull request #10050: MINOR: Don't assume number of dns results (stabilize ClusterConnectio…

2021-02-05 Thread GitBox


mumrah commented on pull request #10050:
URL: https://github.com/apache/kafka/pull/10050#issuecomment-774082101


   @chia7712 I think this got fixed yesterday on trunk with the addition of a 
new HostResolver interface. Now we can precisely control what IPs are resolved 
during testing. 
https://github.com/apache/kafka/commit/131d4753cfed65ed6dee0a8c754765c97c3d513f



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-05 Thread GitBox


dajac commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r571004759



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+topicNames: Set[CreatableTopic],
+controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+config: KafkaConfig,
+metadataCache: MetadataCache,
+time: Time,
+metrics: Metrics,
+threadNamePrefix: Option[String],
+adminManager: ZkAdminManager,
+controller: KafkaController,
+enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+val channelManager =
+  if (enableForwarding)
+Some(new BrokerToControllerChannelManager(
+  controllerNodeProvider = MetadataCacheControllerNodeProvider(
+config, metadataCache),
+  time = time,
+  metrics = metrics,
+  config = config,
+  channelName = "autoTopicCreationChannel",
+  threadNamePrefix = threadNamePrefix,
+  retryTimeoutMs = config.requestTimeoutMs.longValue
+))
+  else
+None
+new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, 
config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+controllerMutationQuota: ControllerMutationQuota): 
Unit = {

Review comment:
   @hachikuji Sorry for my late reply. I've missed the notification. We 
have to enforce the quota on the controller exclusively. It is a global quota 
and we can't really distribute it fairly in the cluster. In this case, it would 
be great if we could propagate the principal and clientId to the controller to 
enforce the quota. However, I wonder how we could propagate the error and the 
delay to the client if the topic creation is throttled. Perhaps, we could reply 
with `UNKNOW_TOPIC_OR_PARTITION` until the topic can be created.  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   >