[GitHub] [kafka] chia7712 commented on pull request #8783: KAFKA-10063 UnsupportedOperation when querying cleaner metrics after …

2020-06-03 Thread GitBox


chia7712 commented on pull request #8783:
URL: https://github.com/apache/kafka/pull/8783#issuecomment-638001504


   @hachikuji Could you take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9887) failed-task-count JMX metric not updated if task fails during startup

2020-06-03 Thread Michael Carter (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124683#comment-17124683
 ] 

Michael Carter commented on KAFKA-9887:
---

This similarly applies to connectors that fail during startup. The 
connector-startup-failure-total metric is not updated because [this code 
section|[https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L103-L121]]
 doesn't propagate the exception back up to the code that would increment the 
metric. In fact, it registers as a successful startup instead.

> failed-task-count JMX metric not updated if task fails during startup
> -
>
> Key: KAFKA-9887
> URL: https://issues.apache.org/jira/browse/KAFKA-9887
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Chris Egerton
>Priority: Major
>
> If a task fails on startup (specifically, during [this code 
> section|https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L427-L468]),
>  the {{failed-task-count}} JMX metric is not updated to reflect the task 
> failure, even though the status endpoints in the REST API do report the task 
> as failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9887) failed-task-count JMX metric not updated if task fails during startup

2020-06-03 Thread Michael Carter (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124683#comment-17124683
 ] 

Michael Carter edited comment on KAFKA-9887 at 6/3/20, 7:19 AM:


This similarly applies to connectors that fail during startup. The 
connector-startup-failure-total metric is not updated because [this code 
section|https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L103-L121]
 doesn't propagate the exception back up to the code that would increment the 
metric. In fact, it registers as a successful startup instead.


was (Author: michael_carter):
This similarly applies to connectors that fail during startup. The 
connector-startup-failure-total metric is not updated because [this code 
section|[https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L103-L121]]
 doesn't propagate the exception back up to the code that would increment the 
metric. In fact, it registers as a successful startup instead.

> failed-task-count JMX metric not updated if task fails during startup
> -
>
> Key: KAFKA-9887
> URL: https://issues.apache.org/jira/browse/KAFKA-9887
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Chris Egerton
>Priority: Major
>
> If a task fails on startup (specifically, during [this code 
> section|https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L427-L468]),
>  the {{failed-task-count}} JMX metric is not updated to reflect the task 
> failure, even though the status endpoints in the REST API do report the task 
> as failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9320) Enable TLSv1.3 by default and disable some of the older protocols

2020-06-03 Thread Nikolay Izhikov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov reassigned KAFKA-9320:
--

Assignee: Nikolay Izhikov

> Enable TLSv1.3 by default and disable some of the older protocols
> -
>
> Key: KAFKA-9320
> URL: https://issues.apache.org/jira/browse/KAFKA-9320
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: needs-kip
> Attachments: report.txt
>
>
> KAFKA-7251 added support for TLSv1.3. We should include this in the list of 
> protocols that are enabled by default. We should also disable some of the 
> older protocols that are not secure. This change requires a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9320) Enable TLSv1.3 by default and disable some of the older protocols

2020-06-03 Thread Nikolay Izhikov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov resolved KAFKA-9320.

Resolution: Fixed

Fixed with the 
https://github.com/apache/kafka/commit/8b22b8159673bfe22d8ac5dcd4e4312d4f2c863c

> Enable TLSv1.3 by default and disable some of the older protocols
> -
>
> Key: KAFKA-9320
> URL: https://issues.apache.org/jira/browse/KAFKA-9320
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: needs-kip
> Attachments: report.txt
>
>
> KAFKA-7251 added support for TLSv1.3. We should include this in the list of 
> protocols that are enabled by default. We should also disable some of the 
> older protocols that are not secure. This change requires a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-03 Thread GitBox


dajac commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434360661



##
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##
@@ -391,6 +404,54 @@ class ControllerContext {
 partitionsForTopic(topic).filter { partition => 
states.contains(partitionState(partition)) }.toSet
   }
 
+  def putLeadershipInfo(partition: TopicPartition, 
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+val previous = partitionLeadershipInfo.put(partition, 
leaderIsrAndControllerEpoch)
+val replicaAssignment = partitionFullReplicaAssignment(partition)
+updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), 
previous,
+  Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
+oldReplicaAssignment: 
Option[ReplicaAssignment],
+oldLeadershipInfo: 
Option[LeaderIsrAndControllerEpoch],
+newReplicaAssignment: 
Option[ReplicaAssignment],

Review comment:
   Actually, in `updatePartitionFullReplicaAssignment`, 
`partitionLeadershipInfo.get(topicPartition)` returns an `Option` so it is more 
convenient to keep it as an `Option`. Ok for you?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-03 Thread GitBox


dajac commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434360795



##
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##
@@ -391,6 +404,54 @@ class ControllerContext {
 partitionsForTopic(topic).filter { partition => 
states.contains(partitionState(partition)) }.toSet
   }
 
+  def putLeadershipInfo(partition: TopicPartition, 
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+val previous = partitionLeadershipInfo.put(partition, 
leaderIsrAndControllerEpoch)
+val replicaAssignment = partitionFullReplicaAssignment(partition)
+updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), 
previous,
+  Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
+oldReplicaAssignment: 
Option[ReplicaAssignment],
+oldLeadershipInfo: 
Option[LeaderIsrAndControllerEpoch],
+newReplicaAssignment: 
Option[ReplicaAssignment],
+newLeadershipInfo: 
Option[LeaderIsrAndControllerEpoch]): Unit = {
+if (!isTopicQueuedUpForDeletion(partition.topic)) {
+  oldReplicaAssignment.foreach { replicaAssignment =>
+oldLeadershipInfo.foreach { leadershipInfo =>
+  if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+preferredReplicaImbalanceCount -= 1
+}
+  }
+
+  newReplicaAssignment.foreach { replicaAssignment =>
+newLeadershipInfo.foreach { leadershipInfo =>
+  if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+preferredReplicaImbalanceCount += 1
+}
+  }
+}
+  }
+
+  private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
+partitionAssignments.getOrElse(topic, mutable.Map.empty).foreach { case 
(partition, replicaAssignment) =>
+  partitionLeadershipInfo.get(new TopicPartition(topic, 
partition)).foreach { leadershipInfo =>
+if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+  preferredReplicaImbalanceCount -= 1
+  }
+}
+  }
+
+  private def isReplicaImbalance(replicaAssignment: ReplicaAssignment,

Review comment:
   Yes, that is fine. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9887) failed-task-count JMX metric not updated if task fails during startup

2020-06-03 Thread Michael Carter (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124683#comment-17124683
 ] 

Michael Carter edited comment on KAFKA-9887 at 6/3/20, 7:25 AM:


This similarly applies to connectors that fail during startup. The 
connector-startup-failure-total metric is not updated because [this code 
section|https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L103-L121]
 doesn't propagate the exception back up to the code that would increment the 
metric. In fact, since it never receives the exception, that code register a 
successful startup instead.


was (Author: michael_carter):
This similarly applies to connectors that fail during startup. The 
connector-startup-failure-total metric is not updated because [this code 
section|https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L103-L121]
 doesn't propagate the exception back up to the code that would increment the 
metric. In fact, it registers as a successful startup instead.

> failed-task-count JMX metric not updated if task fails during startup
> -
>
> Key: KAFKA-9887
> URL: https://issues.apache.org/jira/browse/KAFKA-9887
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Chris Egerton
>Priority: Major
>
> If a task fails on startup (specifically, during [this code 
> section|https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L427-L468]),
>  the {{failed-task-count}} JMX metric is not updated to reflect the task 
> failure, even though the status endpoints in the REST API do report the task 
> as failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-03 Thread GitBox


dajac commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434361213



##
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##
@@ -391,6 +404,54 @@ class ControllerContext {
 partitionsForTopic(topic).filter { partition => 
states.contains(partitionState(partition)) }.toSet
   }
 
+  def putLeadershipInfo(partition: TopicPartition, 
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+val previous = partitionLeadershipInfo.put(partition, 
leaderIsrAndControllerEpoch)
+val replicaAssignment = partitionFullReplicaAssignment(partition)
+updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), 
previous,
+  Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
+oldReplicaAssignment: 
Option[ReplicaAssignment],
+oldLeadershipInfo: 
Option[LeaderIsrAndControllerEpoch],
+newReplicaAssignment: 
Option[ReplicaAssignment],
+newLeadershipInfo: 
Option[LeaderIsrAndControllerEpoch]): Unit = {
+if (!isTopicQueuedUpForDeletion(partition.topic)) {
+  oldReplicaAssignment.foreach { replicaAssignment =>
+oldLeadershipInfo.foreach { leadershipInfo =>
+  if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+preferredReplicaImbalanceCount -= 1
+}
+  }
+
+  newReplicaAssignment.foreach { replicaAssignment =>
+newLeadershipInfo.foreach { leadershipInfo =>
+  if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+preferredReplicaImbalanceCount += 1
+}
+  }
+}
+  }
+
+  private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
+partitionAssignments.getOrElse(topic, mutable.Map.empty).foreach { case 
(partition, replicaAssignment) =>
+  partitionLeadershipInfo.get(new TopicPartition(topic, 
partition)).foreach { leadershipInfo =>
+if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+  preferredReplicaImbalanceCount -= 1
+  }
+}
+  }
+
+  private def isReplicaImbalance(replicaAssignment: ReplicaAssignment,
+ leadershipInfo: LeaderIsrAndControllerEpoch): 
Boolean = {
+val preferredReplica = replicaAssignment.replicas.head
+if (replicaAssignment.isBeingReassigned && 
replicaAssignment.addingReplicas.contains(preferredReplica))
+  // reassigning partitions are not counted as imbalanced until the new 
replica joins the ISR (completes reassignment)

Review comment:
   I agree with you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10089) The stale ssl engine factory is not closed after reconfigure

2020-06-03 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10089:
--

 Summary: The stale ssl engine factory is not closed after 
reconfigure
 Key: KAFKA-10089
 URL: https://issues.apache.org/jira/browse/KAFKA-10089
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


{code}
@Override
public void reconfigure(Map newConfigs) throws KafkaException {
SslEngineFactory newSslEngineFactory = 
createNewSslEngineFactory(newConfigs);
if (newSslEngineFactory != this.sslEngineFactory) {
this.sslEngineFactory = newSslEngineFactory; // we should close the 
older one
log.info("Created new {} SSL engine builder with keystore {} 
truststore {}", mode,
newSslEngineFactory.keystore(), 
newSslEngineFactory.truststore());
}
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 opened a new pull request #8792: KAFKA-10089 The stale ssl engine factory is not closed after reconfigure

2020-06-03 Thread GitBox


chia7712 opened a new pull request #8792:
URL: https://github.com/apache/kafka/pull/8792


   https://issues.apache.org/jira/browse/KAFKA-10089
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-06-03 Thread Mateusz Jadczyk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124787#comment-17124787
 ] 

Mateusz Jadczyk commented on KAFKA-9891:


[~bchen225242] well, it shouldn't occur for either of the keys, but this will 
pinpoint the poisonKey.
Btw we're currently not using standby replicas because of this bug and we 
probably could wait until 2.6. Not sure about all other kafka streams users 
though..

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
> Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, 
> state_store_operations.txt, tasks_assignment.txt
>
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 ---

[jira] [Created] (KAFKA-10090) Misleading warnings: The configuration was supplied but isn't a known config

2020-06-03 Thread Robert Wruck (Jira)
Robert Wruck created KAFKA-10090:


 Summary: Misleading warnings: The configuration was supplied but 
isn't a known config
 Key: KAFKA-10090
 URL: https://issues.apache.org/jira/browse/KAFKA-10090
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.5.0
Reporter: Robert Wruck


In our setup (using Spring cloud stream Kafka binder), we see log messages like:

 

{{The configuration 'ssl.keystore.password' was supplied but isn't a known 
config}}

 

logged by org.apache.kafka.clients.admin.AdminClientConfig. The Kafka binder 
actually uses SSL and security.protocol is set to SSL.

Looking through the code, a few things seem odd:
 * The log message says "isn't a known config" but that's not true. It is 
*known*, i.e. defined in ConfigDef, but not *used*.
 * The method for detecting whether a config is actually *used* is not 
complete. ChannelBuilders.channelBuilderConfigs() for example extracts the 
configs to use for the created channel builder using *new 
HashMap(config.values())* thus *get()* won't mark a config as used anymore.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #8783: KAFKA-10063 UnsupportedOperation when querying cleaner metrics after …

2020-06-03 Thread GitBox


chia7712 commented on a change in pull request #8783:
URL: https://github.com/apache/kafka/pull/8783#discussion_r434456891



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -112,9 +112,19 @@ class LogCleaner(initialConfig: CleanerConfig,
 
   private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]()
 
+  /**
+   * scala 2.12 does not support maxOption so we handle the empty manually.
+   * @param f to compute the result
+   * @return the computed value or 0 if there is no cleaner
+   */
+  private def computeCleanerThread(f: Seq[CleanerThread] => Int): Int =

Review comment:
   a workaround for scala 2.12





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-06-03 Thread GitBox


kowshik commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r434344257



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and 
max, each of type long.
+ * The min and max attributes need to satisfy 2 rules:
+ *  - they are each expected to be >= 1, as we only consider positive version 
values to be valid.
+ *  - max should be >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {
+// Non-empty label for the min version key, that's used only for 
serialization/deserialization purposes.
+private final String minKeyLabel;
+
+// The value of the minimum version.
+private final long minValue;

Review comment:
   Done. I have made it `int16` now. Great point.

##
File path: core/src/main/scala/kafka/zk/ZkData.scala
##
@@ -22,7 +22,7 @@ import java.util.Properties
 
 import com.fasterxml.jackson.annotation.JsonProperty
 import com.fasterxml.jackson.core.JsonProcessingException
-import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_2_6_IV1, LeaderAndIsr}

Review comment:
   Done. Made it KAFKA_2_7_IV0.

##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and 
max, each of type long.
+ * The min and max attributes need to satisfy 2 rules:
+ *  - they are each expected to be >= 1, as we only consider positive version 
values to be valid.
+ *  - max should be >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {
+// Non-empty label for the min version key, that's used only for 
serialization/deserialization purposes.
+private final String minKeyLabel;
+
+// The value of the minimum version.
+private final long minValue;
+
+// Non-empty label for the max version key, that's used only for 
serialization/deserialization purposes.
+private final String maxKeyLabel;
+
+// The value of the maximum version.
+private final long maxValue;
+
+/**
+ * Raises an exception unless the following condition is met:
+ * minValue >= 1 and maxValue >= 1 and maxValue >= minValue.
+ *
+ * @param minKeyLabel   Label for the min version key, that's used only for
+ *  serialization/deserialization purposes.
+ * @param minValue  The minimum version value.
+ * @param maxKeyLabel   Label for the max version key, that's used only for
+ * 

[jira] [Commented] (KAFKA-10090) Misleading warnings: The configuration was supplied but isn't a known config

2020-06-03 Thread Robert Wruck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124823#comment-17124823
 ] 

Robert Wruck commented on KAFKA-10090:
--

The "new HashMap" leak was introduced by KAFKA-7588.

> Misleading warnings: The configuration was supplied but isn't a known config
> 
>
> Key: KAFKA-10090
> URL: https://issues.apache.org/jira/browse/KAFKA-10090
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Robert Wruck
>Priority: Major
>
> In our setup (using Spring cloud stream Kafka binder), we see log messages 
> like:
>  
> {{The configuration 'ssl.keystore.password' was supplied but isn't a known 
> config}}
>  
> logged by org.apache.kafka.clients.admin.AdminClientConfig. The Kafka binder 
> actually uses SSL and security.protocol is set to SSL.
> Looking through the code, a few things seem odd:
>  * The log message says "isn't a known config" but that's not true. It is 
> *known*, i.e. defined in ConfigDef, but not *used*.
>  * The method for detecting whether a config is actually *used* is not 
> complete. ChannelBuilders.channelBuilderConfigs() for example extracts the 
> configs to use for the created channel builder using *new 
> HashMap(config.values())* thus *get()* won't mark a config as used anymore.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] astubbs commented on pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers

2020-06-03 Thread GitBox


astubbs commented on pull request #8771:
URL: https://github.com/apache/kafka/pull/8771#issuecomment-638105979


   > Checkstyle error:
   
   ah dag nammit, sorry guys



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8666: KAFKA-9479: Describe consumer group --state --all-groups show header once

2020-06-03 Thread GitBox


dajac commented on a change in pull request #8666:
URL: https://github.com/apache/kafka/pull/8666#discussion_r434554562



##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
 }
 
 private def printStates(states: Map[String, GroupState]): Unit = {
-  for ((groupId, state) <- states) {
-if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+  val stateProps =  
states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, 
Some(state.state), Some(1))}
+.map{case (_,state)=>
   val coordinator = 
s"${state.coordinator.host}:${state.coordinator.port} 
(${state.coordinator.idString})"

Review comment:
   I suggest to extract this as a method `coordinatorString` in the 
`GroupState` case class or in an inner function.

##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
 }
 
 private def printStates(states: Map[String, GroupState]): Unit = {
-  for ((groupId, state) <- states) {
-if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+  val stateProps =  
states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, 
Some(state.state), Some(1))}
+.map{case (_,state)=>
   val coordinator = 
s"${state.coordinator.host}:${state.coordinator.port} 
(${state.coordinator.idString})"
-  val coordinatorColLen = Math.max(25, coordinator.length)
-  print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s 
%s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", 
"#MEMBERS"))
-  print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s 
%s".format(state.group, coordinator, state.assignmentStrategy, state.state, 
state.numMembers))
-  println()
+  
(state.group,coordinator,state.assignmentStrategy,state.state,state.numMembers)
 }
+  val hasAllGroups = opts.options.has(opts.allGroupsOpt)
+  if(stateProps.nonEmpty && hasAllGroups){
+val headerLengthOffset = 
Math.max(25,stateProps.maxBy{_._2.length}._2.length)
+print(s"\n%${-headerLengthOffset}s %-25s %-20s %-15s 
%s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", 
"#MEMBERS"))
+  }
+  stateProps.foreach{ 
case(group,coordinator,assignmentStrategy,state,numMembers)=>
+val offset = -Math.max(25,coordinator.length)

Review comment:
   The computation of the offset is not consistent here. For all groups, it 
will result in misaligning the group information. We should compute it once and 
reuse it all the time.

##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
 }
 
 private def printStates(states: Map[String, GroupState]): Unit = {
-  for ((groupId, state) <- states) {
-if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+  val stateProps =  
states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, 
Some(state.state), Some(1))}
+.map{case (_,state)=>
   val coordinator = 
s"${state.coordinator.host}:${state.coordinator.port} 
(${state.coordinator.idString})"
-  val coordinatorColLen = Math.max(25, coordinator.length)
-  print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s 
%s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", 
"#MEMBERS"))
-  print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s 
%s".format(state.group, coordinator, state.assignmentStrategy, state.state, 
state.numMembers))
-  println()
+  
(state.group,coordinator,state.assignmentStrategy,state.state,state.numMembers)
 }
+  val hasAllGroups = opts.options.has(opts.allGroupsOpt)
+  if(stateProps.nonEmpty && hasAllGroups){

Review comment:
   What's the reason why you distinguish the all groups case here? It seems 
that it was not case before.

##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -291,14 +291,21 @@ object ConsumerGroupCommand extends Logging {
 }
 
 private def printStates(states: Map[String, GroupState]): Unit = {
-  for ((groupId, state) <- states) {
-if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
+  val stateProps =  
states.filter{case(groupId,state)=>shouldPrintMemberState(groupId, 
Some(state.state), Some(1))}
+.map{case (_,state)=>
   val coordinator = 
s"${state.coordinator.host}:${state.coordinator.port} 
(${state.coordinator.idString})"
-  val coordinatorColLen = Math.max(25, coordinator.length)
-  print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s 
%s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", 
"#MEMBERS"))
-  print(s"\n%${-coordinatorColLen}s %

[GitHub] [kafka] dajac opened a new pull request #8793: KAFKA-9514; The protocol generator generated useless condition when afield is made nullable and flexible version is used

2020-06-03 Thread GitBox


dajac opened a new pull request #8793:
URL: https://github.com/apache/kafka/pull/8793


   The protocol generator generates useless conditions when a field of type 
string is made nullable after the request has been converted to using optional 
fields.
   
   As an example, we have make the field `ProtocolName` nullable in the 
`JoinGroupResponse`. The `JoinGroupResponse` supports optional fields since 
version 6 and the field is nullable since version 7. Under these conditions, 
the generator generates the following code:
   
   ```
   if (protocolName == null) {
if (_version >= 7) {
  if (_version >= 6) {
_writable.writeUnsignedVarint(0);
  } else {
_writable.writeShort((short) -1);
 }
} else {
  throw new NullPointerException();
}
   }
   ```
   
   spotbugs raises an `UC_USELESS_CONDITION` because `_version >= 6` is always 
true.
   
   This PR fixes the bug by propagating the outer versions to the underlying 
`VersionConditional` so it can generate the code accordingly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8782: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends

2020-06-03 Thread GitBox


chia7712 commented on pull request #8782:
URL: https://github.com/apache/kafka/pull/8782#issuecomment-638228671


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8751: MINOR: Update to Gradle 6.5 and tweak build jvm config

2020-06-03 Thread GitBox


ijuma commented on pull request #8751:
URL: https://github.com/apache/kafka/pull/8751#issuecomment-638255192


   @omkreddy Updated the PR description and made a few more changes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

2020-06-03 Thread GitBox


abbccdda commented on pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#issuecomment-638285212


   @guozhangwang Actually my previous response was not correct. By making the 
`InvalidProducerEpoch` to extend a `RetriableException`, we could bypass the 
`sender#canRetry` check to make it non-fatal.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio closed pull request #8639: KAFKA-9962: Make the auth operations fields ignorable

2020-06-03 Thread GitBox


jsancio closed pull request #8639:
URL: https://github.com/apache/kafka/pull/8639


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #8639: KAFKA-9962: Make the auth operations fields ignorable

2020-06-03 Thread GitBox


jsancio commented on pull request #8639:
URL: https://github.com/apache/kafka/pull/8639#issuecomment-638286953


   The bug report in 
[KAFKA-9962](https://issues.apache.org/jira/browse/KAFKA-9962) is for 
`describeCluster`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9962) Admin client throws UnsupportedVersion exception when talking to old broker

2020-06-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-9962:
-

Assignee: (was: Jose Armando Garcia Sancio)

> Admin client throws UnsupportedVersion exception when talking to old broker
> ---
>
> Key: KAFKA-9962
> URL: https://issues.apache.org/jira/browse/KAFKA-9962
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Affects Versions: 2.3.1, 2.5.0, 2.4.1
>Reporter: Jose Armando Garcia Sancio
>Priority: Critical
>
> Users are getting this error when using a client version 2.5 against a 1.1.0 
> cluster/broker.
> {code:java}
> [2020-04-28 01:09:10,663] ERROR Failed to start KSQL 
> (io.confluent.ksql.rest.server.KsqlServerMain:63)
> io.confluent.ksql.util.KsqlServerException: Could not get Kafka authorized 
> operations!
> at 
> io.confluent.ksql.services.KafkaClusterUtil.isAuthorizedOperationsSupported(KafkaClusterUtil.java:51)
> at 
> io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.create(KsqlAuthorizationValidatorFactory.java:52)
> at 
> io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:639)
> at 
> io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:567)
> at 
> io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:100)
> at 
> io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:59)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to 
> write a non-default includeClusterAuthorizedOperations at version 5
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at 
> io.confluent.ksql.services.KafkaClusterUtil.isAuthorizedOperationsSupported(KafkaClusterUtil.java:49)
> ... 5 more
> Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: 
> Attempted to write a non-default includeClusterAuthorizedOperations at 
> version 5
> {code}
> Looking at KIP-430, it mentions that the client is supposed to handle this 
> case:
>  # Existing clients using older versions will not request authorized 
> operations in Describe requests since the default is to disable this feature. 
> This keeps older clients compatible with newer brokers.
>  # Newer clients connecting to older brokers will use the older protocol 
> version and hence will not request authorized operations.
>  # When the AdminClient is talking to a broker which does not support 
> KIP-430, it will fill in either null or UnsupportedVersionException for the 
> returned ACL operations fields in objects. For example, 
> `ConsumerGroupDescription#authorizedOperations` will be null if the broker 
> did not supply this information. DescribeClusterResult#authorizedOperations 
> will throw an `UnsupportedVersionException` if the broker did not supply this 
> information.
>  # When new operations are added, newer brokers may return operations that 
> are not known to older clients. AdminClient will ignore any bit that is set 
> in authorized_operations that is not known to the client. The 
> Set created by the client from the bits returned by the broker 
> will only include operations that the client client knows about.
> I assume that this deployment environment falls under case 2, we have this in 
> the serialization code:
> {code:java}
> if (_version >= 8) {
> _writable.writeByte(includeClusterAuthorizedOperations ? (byte) 1 
> : (byte) 0);
> } else {
> if (includeClusterAuthorizedOperations) {
> throw new UnsupportedVersionException("Attempted to write a 
> non-default includeClusterAuthorizedOperations at version " + _version);
> }
> }
> {code}
> I also looks like we blindly set the version independent of the Broker’s 
> supported version:
> {code:java}
> MetadataRequest.Builder createRequest(int timeoutMs) {
> // Since this only requests node information, it's safe to 
> pass true for allowAutoTopicCreation (and it
> // simplifies communication with older brokers)
> return new MetadataRequest.Builder(new MetadataRequestData()
> .setTopics(Collections.emptyList())
> .setAllowAutoTopicCreation(true)
>  

[GitHub] [kafka] hachikuji commented on pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-06-03 Thread GitBox


hachikuji commented on pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#issuecomment-638294334


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-06-03 Thread GitBox


hachikuji commented on pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#issuecomment-638294449


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8785: KAFKA-10084: Fix EosTestDriver end offset

2020-06-03 Thread GitBox


vvcephei commented on pull request #8785:
URL: https://github.com/apache/kafka/pull/8785#issuecomment-638298913


   All test failures were unrelated (looks like some new flaky tests)
   ```
   kafka.server.GssapiAuthenticationTest.testServerAuthenticationFailure
   
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   
org.apache.kafka.clients.consumer.StickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   
org.apache.kafka.clients.consumer.StickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8782: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends

2020-06-03 Thread GitBox


guozhangwang commented on a change in pull request #8782:
URL: https://github.com/apache/kafka/pull/8782#discussion_r434688366



##
File path: 
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
##
@@ -86,6 +90,70 @@ class TransactionMarkerChannelManagerTest {
   .anyTimes()
   }
 
+  @Test
+  def shouldOnlyWriteTxnCompletionOnce(): Unit = {

Review comment:
   Ah yes, I missed the `txnMetadata2.lock.lock()` before starting the 
scheduler, thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #8785: KAFKA-10084: Fix EosTestDriver end offset

2020-06-03 Thread GitBox


vvcephei merged pull request #8785:
URL: https://github.com/apache/kafka/pull/8785


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-06-03 Thread GitBox


dajac commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-638300527


   @mimaison Are you going to rebase this one? I could take a look at it once 
done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-06-03 Thread GitBox


hachikuji commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r434687347



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -1310,16 +1313,23 @@ private CompletedFetch 
initializeCompletedFetch(CompletedFetch nextCompletedFetc
 return completedFetch;
 }
 
-private void handleOffsetOutOfRange(long fetchOffset,
+private void handleOffsetOutOfRange(FetchPosition fetchPosition,
 TopicPartition topicPartition,
 String reason) {
 if (subscriptions.hasDefaultOffsetResetPolicy()) {
-log.info("Fetch offset {} is out of range for partition {}, 
resetting offset",
-topicPartition, fetchOffset);
+log.info("Fetch offset epoch {} is out of range for partition {}, 
resetting offset",
+fetchPosition, topicPartition);
 subscriptions.requestOffsetReset(topicPartition);
 } else {
-throw new OffsetOutOfRangeException(Collections.singletonMap(
-topicPartition, fetchOffset), reason);
+Map offsetOutOfRangePartitions =
+Collections.singletonMap(topicPartition, fetchPosition.offset);
+String errorMessage = String.format("Offsets out of range " +
+"with no configured reset policy for partitions: %s" +
+", for fetch offset: %d, " +
+"root cause: %s",
+offsetOutOfRangePartitions, fetchPosition.offset, reason);
+log.error(errorMessage);

Review comment:
   Could we make it `info` in both cases? If the application has not set a 
reset policy, then they are expecting to handle the exception.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -817,10 +819,10 @@ public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe
 // In addition, check whether the returned offset and 
epoch are valid. If not, then we should reset
 // its offset if reset policy is configured, or throw out 
of range exception.
 offsetsResult.endOffsets().forEach((respTopicPartition, 
respEndOffset) -> {
-SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
+FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
 
 if (respEndOffset.hasUndefinedEpochOrOffset()) {
-handleOffsetOutOfRange(requestPosition.offset, 
respTopicPartition,
+handleOffsetOutOfRange(requestPosition, 
respTopicPartition,
 "Failed leader offset epoch validation for " + 
respEndOffset

Review comment:
   Seems like it's more useful to reference `requestPosition` here since 
the end offset is undefined.
   

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -832,6 +834,7 @@ public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe
 });
 
 if (!truncationWithoutResetPolicy.isEmpty()) {
+log.error("Detected log truncation with diverging 
offsets " + truncationWithoutResetPolicy);

Review comment:
   As in the other case, can we make this info? It would also be helpful if 
we could include the request offset and epoch. Potentially we could move this 
log line into the loop above where we already access `requestPosition`.
   
   Also nit: use `{}`

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -817,10 +819,10 @@ public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe
 // In addition, check whether the returned offset and 
epoch are valid. If not, then we should reset
 // its offset if reset policy is configured, or throw out 
of range exception.
 offsetsResult.endOffsets().forEach((respTopicPartition, 
respEndOffset) -> {
-SubscriptionState.FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
+FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
 
 if (respEndOffset.hasUndefinedEpochOrOffset()) {
-handleOffsetOutOfRange(requestPosition.offset, 
respTopicPartition,
+handleOffsetOutOfRange(requestPosition, 
respTopicPartition,

Review comment:
   Didn't notice this before, but this is handled inside a loop. If one 
partition hits an error, then the raised exception will prevent us from 
completing the validation for other p

[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-03 Thread GitBox


hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-638305368


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-03 Thread GitBox


hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-638305947


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-03 Thread GitBox


hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-638306026


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8338) Improve consumer offset expiration logic to take subscription into account

2020-06-03 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-8338.

Resolution: Duplicate

This has been done as part of KAFKA-8730 (KIP-496).

> Improve consumer offset expiration logic to take subscription into account
> --
>
> Key: KAFKA-8338
> URL: https://issues.apache.org/jira/browse/KAFKA-8338
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: huxihx
>Priority: Major
>
> Currently, we expire consumer offsets for a group after the group is 
> considered gone.
> There is a case where the consumer group still exists, but is now subscribed 
> to different topics. In that case, the offsets of the old topics will never 
> expire and if lag is monitored, the monitors will show ever-growing lag on 
> those topics. 
> We need to improve the logic to expire the consumer offsets if the consumer 
> group didn't subscribe to specific topics/partitions for enough time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #6737: KAFKA-8338: consumer offset expiration should consider subscription.

2020-06-03 Thread GitBox


dajac commented on pull request #6737:
URL: https://github.com/apache/kafka/pull/6737#issuecomment-638311288


   This has been done as part of KAFKA-8730 (KIP-496). We can close it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8785: KAFKA-10084: Fix EosTestDriver end offset

2020-06-03 Thread GitBox


vvcephei commented on pull request #8785:
URL: https://github.com/apache/kafka/pull/8785#issuecomment-638313567


   cherry-picked to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9988) Connect incorrectly logs that task has failed when one takes too long to shutdown

2020-06-03 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-9988:
-
Summary: Connect incorrectly logs that task has failed when one takes too 
long to shutdown  (was: Connect incorrectly reports task has failed when task 
takes too long to shutdown)

> Connect incorrectly logs that task has failed when one takes too long to 
> shutdown
> -
>
> Key: KAFKA-9988
> URL: https://issues.apache.org/jira/browse/KAFKA-9988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.4.2, 
> 2.5.1
>Reporter: Sanjana Kaundinya
>Priority: Major
>
> If the OffsetStorageReader is closed while the task is trying to shutdown, 
> and the task is trying to access the offsets from the OffsetStorageReader, 
> then we see the following in the logs.
> {code:java}
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
> at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
> closed while attempting to read offsets. This is likely because the task was 
> been scheduled to stop but has taken longer than the graceful shutdown period 
> to do so.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
> ... 14 more
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is 
> being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> This is a bit misleading, because the task is already on its way of being 
> shutdown, and doesn't actually need manual intervention to be restarted. We 
> can see that as later on in the logs we see that it throws another 
> unrecoverable exception.
> {code:java}
> [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> If we know a task is on its way of shutting down, we should not throw a 
> ConnectException and instead log a warning so that we don't log false 
> negatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-06-03 Thread GitBox


hachikuji commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r434725093



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -822,19 +822,25 @@ public void onSuccess(OffsetForEpochResult offsetsResult) 
{
 FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
 
 if (respEndOffset.hasUndefinedEpochOrOffset()) {
-handleOffsetOutOfRange(requestPosition, 
respTopicPartition,
-"Failed leader offset epoch validation for " + 
respEndOffset
-+ " since no end offset larger than current 
fetch epoch was reported");
+try {
+handleOffsetOutOfRange(requestPosition, 
respTopicPartition,
+"Failed leader offset epoch validation for 
" + requestPosition
++ " since no end offset larger than 
current fetch epoch was reported");
+} catch (OffsetOutOfRangeException e) {
+// Swallow the OffsetOutOfRangeException to 
finish all partitions validation.

Review comment:
   I don't feel great about having this in the code, even if it's supposed 
to be temporary. I think we should just fix the exception propagation bug in 
this patch. It seems like it would be straightforward to do something similar 
to what is done in the `onFailure` path.
   ```java
   if (!(e instanceof RetriableException) && 
!cachedOffsetForLeaderException.compareAndSet(null, e)) {
   log.error("Discarding error in OffsetsForLeaderEpoch 
because another error is pending", e);
   }
   ```
   Above may not be ideal, but at least it provides a way to propagate 
individual errors.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -822,19 +822,25 @@ public void onSuccess(OffsetForEpochResult offsetsResult) 
{
 FetchPosition requestPosition = 
fetchPositions.get(respTopicPartition);
 
 if (respEndOffset.hasUndefinedEpochOrOffset()) {
-handleOffsetOutOfRange(requestPosition, 
respTopicPartition,
-"Failed leader offset epoch validation for " + 
respEndOffset
-+ " since no end offset larger than current 
fetch epoch was reported");
+try {
+handleOffsetOutOfRange(requestPosition, 
respTopicPartition,
+"Failed leader offset epoch validation for 
" + requestPosition
++ " since no end offset larger than 
current fetch epoch was reported");
+} catch (OffsetOutOfRangeException e) {
+// Swallow the OffsetOutOfRangeException to 
finish all partitions validation.
+}
 } else {
 Optional divergentOffsetOpt = 
subscriptions.maybeCompleteValidation(
 respTopicPartition, requestPosition, 
respEndOffset);
 divergentOffsetOpt.ifPresent(
-divergentOffset -> 
truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset));
+divergentOffset -> {
+log.info("Detected log truncation with 
diverging offset: {}", divergentOffset);

Review comment:
   Can you add the topic partition to this message?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8751: MINOR: Update to Gradle 6.5 and tweak build jvm config

2020-06-03 Thread GitBox


ijuma commented on pull request #8751:
URL: https://github.com/apache/kafka/pull/8751#issuecomment-638340086







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #8751: MINOR: Update to Gradle 6.5 and tweak build jvm config

2020-06-03 Thread GitBox


ijuma edited a comment on pull request #8751:
URL: https://github.com/apache/kafka/pull/8751#issuecomment-638340086


   JDK 8/Scala 2.12 had an out of memory issue.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8790: MINOR: Upgrade spotbugs and spotbugsPlugin

2020-06-03 Thread GitBox


ijuma commented on pull request #8790:
URL: https://github.com/apache/kafka/pull/8790#issuecomment-638340962


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #8782: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends

2020-06-03 Thread GitBox


hachikuji merged pull request #8782:
URL: https://github.com/apache/kafka/pull/8782


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10080) IllegalStateException after duplicate CompleteCommit append to transaction log

2020-06-03 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17125181#comment-17125181
 ] 

Jason Gustafson commented on KAFKA-10080:
-

Note that this seems to have been introduced in KAFKA-9777, which has not been 
part of any released version. I have merged the patch into the 2.6 branch, so 
it should not affect 2.6.0 either.

> IllegalStateException after duplicate CompleteCommit append to transaction log
> --
>
> Key: KAFKA-10080
> URL: https://issues.apache.org/jira/browse/KAFKA-10080
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> We noticed this exception in the logs:
> {code}
> java.lang.IllegalStateException: TransactionalId foo completing transaction 
> state transition while it does not have a pending state   
>  
> at 
> kafka.coordinator.transaction.TransactionMetadata.$anonfun$completeTransitionTo$1(TransactionMetadata.scala:357)
> at 
> kafka.coordinator.transaction.TransactionMetadata.completeTransitionTo(TransactionMetadata.scala:353)
> at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$3(TransactionStateManager.scala:595)
>   
>  
> at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:188)
> at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:587)
>   
> 
> at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:126)
> at 
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
> at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:107)
> at 
> kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
> at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:378)
> at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:280)
> at 
> kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:122)
> at 
> kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:1023)
> at 
> kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:740)
> {code}
> After inspection, we found that there were two CompleteCommit entries in the 
> transaction state log which explains the failed transition. Indeed the logic 
> for writing the CompleteCommit message does seem prone to race conditions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10080) IllegalStateException after duplicate CompleteCommit append to transaction log

2020-06-03 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10080.
-
Fix Version/s: 2.6.0
   Resolution: Fixed

> IllegalStateException after duplicate CompleteCommit append to transaction log
> --
>
> Key: KAFKA-10080
> URL: https://issues.apache.org/jira/browse/KAFKA-10080
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.6.0
>
>
> We noticed this exception in the logs:
> {code}
> java.lang.IllegalStateException: TransactionalId foo completing transaction 
> state transition while it does not have a pending state   
>  
> at 
> kafka.coordinator.transaction.TransactionMetadata.$anonfun$completeTransitionTo$1(TransactionMetadata.scala:357)
> at 
> kafka.coordinator.transaction.TransactionMetadata.completeTransitionTo(TransactionMetadata.scala:353)
> at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$3(TransactionStateManager.scala:595)
>   
>  
> at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:188)
> at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:587)
>   
> 
> at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:126)
> at 
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
> at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:107)
> at 
> kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
> at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:378)
> at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:280)
> at 
> kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:122)
> at 
> kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:1023)
> at 
> kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:740)
> {code}
> After inspection, we found that there were two CompleteCommit entries in the 
> transaction state log which explains the failed transition. Indeed the logic 
> for writing the CompleteCommit message does seem prone to race conditions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9777) Purgatory locking bug can lead to hanging transaction

2020-06-03 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9777.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Purgatory locking bug can lead to hanging transaction
> -
>
> Key: KAFKA-9777
> URL: https://issues.apache.org/jira/browse/KAFKA-9777
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 2.6.0
>
>
> Once a transaction reaches the `PrepareCommit` or `PrepareAbort` state, the 
> transaction coordinator must send markers to all partitions included in the 
> transaction. After all markers have been sent, then the transaction 
> transitions to the corresponding completed state. Until this transition 
> occurs, no additional progress can be made by the producer.
> The transaction coordinator uses a purgatory to track completion of the 
> markers that need to be sent. Once all markers have been written, then the 
> `DelayedTxnMarker` task becomes completable. We depend on its completion in 
> order to transition to the completed state.
> Related to KAFKA-8334, there is a bug in the locking protocol which is used 
> to check completion of the `DelayedTxnMarker` task. The purgatory attempts to 
> provide a "happens before" contract for task completion with 
> `checkAndComplete`. Basically if a task is completed before calling 
> `checkAndComplete`, then it should be given an opportunity to complete as 
> long as there is sufficient time remaining before expiration. 
> The bug in the locking protocol is that it expects that the operation lock is 
> exclusive to the operation. See here: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedOperation.scala#L114.
>  The logic assumes that if the lock cannot be acquired, then the other holder 
> of the lock must be attempting completion of the same delayed operation. If 
> that is not the case, then the "happens before" contract is broken and a task 
> may not get completed until expiration even if it has been satisfied.
> In the case of `DelayedTxnMarker`, the lock in use is the read side of a 
> read-write lock which is used for partition loading: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala#L264.
>  In fact, if the lock cannot be acquired, it means that it is being held in 
> order to complete some loading operation, in which case it will definitely 
> not attempt completion of the delayed operation. If this happens to occur on 
> the last call to `checkAndComplete` after all markers have been written, then 
> the transition to the completing state will never occur.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram merged pull request #8792: KAFKA-10089 The stale ssl engine factory is not closed after reconfigure

2020-06-03 Thread GitBox


rajinisivaram merged pull request #8792:
URL: https://github.com/apache/kafka/pull/8792


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-03 Thread GitBox


hachikuji commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434702023



##
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##
@@ -391,6 +404,90 @@ class ControllerContext {
 partitionsForTopic(topic).filter { partition => 
states.contains(partitionState(partition)) }.toSet
   }
 
+  def putPartitionLeadershipInfo(partition: TopicPartition,
+ leaderIsrAndControllerEpoch: 
LeaderIsrAndControllerEpoch): Unit = {
+val previous = partitionLeadershipInfo.put(partition, 
leaderIsrAndControllerEpoch)
+val replicaAssignment = partitionFullReplicaAssignment(partition)
+updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), 
previous,
+  Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  def partitionLeadershipInfo(partition: TopicPartition): 
Option[LeaderIsrAndControllerEpoch] = {
+partitionLeadershipInfo.get(partition)
+  }
+
+  def partitionsLeadershipInfo(): Iterable[(TopicPartition, 
LeaderIsrAndControllerEpoch)] = {
+partitionLeadershipInfo
+  }
+
+  def partitionsWithLeaders(): Set[TopicPartition] = {
+partitionLeadershipInfo.keySet
+  }
+
+  def partitionsWithoutLeaders(): Set[TopicPartition] = {
+partitionLeadershipInfo.filter { case (topicPartition, 
leaderIsrAndControllerEpoch) =>
+  !isReplicaOnline(leaderIsrAndControllerEpoch.leaderAndIsr.leader, 
topicPartition) &&
+!isTopicQueuedUpForDeletion(topicPartition.topic)
+}.keySet
+  }
+
+  def partitionLeadsOnBroker(brokerId: Int): Set[TopicPartition] = {

Review comment:
   nit: `partitionLeadersOnBroker`?

##
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##
@@ -391,6 +404,90 @@ class ControllerContext {
 partitionsForTopic(topic).filter { partition => 
states.contains(partitionState(partition)) }.toSet
   }
 
+  def putPartitionLeadershipInfo(partition: TopicPartition,
+ leaderIsrAndControllerEpoch: 
LeaderIsrAndControllerEpoch): Unit = {
+val previous = partitionLeadershipInfo.put(partition, 
leaderIsrAndControllerEpoch)
+val replicaAssignment = partitionFullReplicaAssignment(partition)
+updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), 
previous,
+  Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  def partitionLeadershipInfo(partition: TopicPartition): 
Option[LeaderIsrAndControllerEpoch] = {
+partitionLeadershipInfo.get(partition)
+  }
+
+  def partitionsLeadershipInfo(): Iterable[(TopicPartition, 
LeaderIsrAndControllerEpoch)] = {
+partitionLeadershipInfo
+  }
+
+  def partitionsWithLeaders(): Set[TopicPartition] = {
+partitionLeadershipInfo.keySet

Review comment:
   This definition seems inconsistent with `partitionsWithoutLeaders`. I 
think you're just trying to preserve the existing logic. It might make sense to 
use a different name to avoid the apparent inconsistency? Maybe we could change 
`partitionsWithoutLeaders` to be `partitionsWithOfflineLeaders` or something 
like that. Looking at the caller, it looks like it would be reasonable to 
exclude topics which are being queued for deletion in both cases, but we could 
change that separately if you think it is risky.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10089) The stale ssl engine factory is not closed after reconfigure

2020-06-03 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-10089.

Fix Version/s: 2.6.0
 Reviewer: Rajini Sivaram
   Resolution: Fixed

> The stale ssl engine factory is not closed after reconfigure
> 
>
> Key: KAFKA-10089
> URL: https://issues.apache.org/jira/browse/KAFKA-10089
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 2.6.0
>
>
> {code}
> @Override
> public void reconfigure(Map newConfigs) throws KafkaException {
> SslEngineFactory newSslEngineFactory = 
> createNewSslEngineFactory(newConfigs);
> if (newSslEngineFactory != this.sslEngineFactory) {
> this.sslEngineFactory = newSslEngineFactory; // we should close 
> the older one
> log.info("Created new {} SSL engine builder with keystore {} 
> truststore {}", mode,
> newSslEngineFactory.keystore(), 
> newSslEngineFactory.truststore());
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-06-03 Thread GitBox


hachikuji commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r434761114



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -827,14 +827,15 @@ public void onSuccess(OffsetForEpochResult offsetsResult) 
{
 "Failed leader offset epoch validation for 
" + requestPosition
 + " since no end offset larger than 
current fetch epoch was reported");
 } catch (OffsetOutOfRangeException e) {
-// Swallow the OffsetOutOfRangeException to 
finish all partitions validation.
+setFatalOffsetForLeaderException(e);

Review comment:
   Can we fix the LogTruncationException case below as well?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8715: KAFKA-10033: AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing

2020-06-03 Thread GitBox


omkreddy commented on pull request #8715:
URL: https://github.com/apache/kafka/pull/8715#issuecomment-638386016


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-06-03 Thread GitBox


kowshik commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r434428173



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##
@@ -113,14 +127,44 @@ public static ApiVersionsResponse fromStruct(Struct 
struct, short version) {
 }
 }
 
-public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, 
byte maxMagic) {
+public static ApiVersionsResponse apiVersionsResponse(
+int throttleTimeMs,
+byte maxMagic,
+Features latestSupportedFeatures) {
+return apiVersionsResponse(
+throttleTimeMs, maxMagic, latestSupportedFeatures, 
Optional.empty(), Optional.empty());
+}
+
+public static ApiVersionsResponse apiVersionsResponse(
+int throttleTimeMs,
+byte maxMagic,
+Features latestSupportedFeatures,
+Features finalizedFeatures,
+long finalizedFeaturesEpoch) {
+return apiVersionsResponse(
+throttleTimeMs, maxMagic, latestSupportedFeatures, 
Optional.of(finalizedFeatures), Optional.of(finalizedFeaturesEpoch));
+}
+
+private static ApiVersionsResponse apiVersionsResponse(
+int throttleTimeMs,
+byte maxMagic,
+Features latestSupportedFeatures,
+Optional> finalizedFeatures,

Review comment:
   It's because non-existing supported features _can_ be represented by an 
empty map (i.e. broker does not advertise any features). But on the other hand, 
non-existing finalized features can not be represented by empty map alone, as 
we need a suitable epoch value that indicates the absence of finalized 
features. To address this case, I saw 2 ways:
   1) Provide a negative epoch value indicating absence of finalized features, 
OR
   2) Represent using an empty `Optional` for both finalized features and epoch.
   
   I chose the latter approach. Please, let me know if you have concerns.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8793: KAFKA-9514; The protocol generator generated useless condition when a field is made nullable and flexible version is used

2020-06-03 Thread GitBox


cmccabe commented on pull request #8793:
URL: https://github.com/apache/kafka/pull/8793#issuecomment-638404260


   LGTM.  Thanks, @dajac 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8784: KAFKA-9788: Use distinct names for transaction and group load time se…

2020-06-03 Thread GitBox


hachikuji commented on pull request #8784:
URL: https://github.com/apache/kafka/pull/8784#issuecomment-638419527


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8790: MINOR: Upgrade spotbugs and spotbugsPlugin

2020-06-03 Thread GitBox


ijuma commented on pull request #8790:
URL: https://github.com/apache/kafka/pull/8790#issuecomment-638437924


   One job failed for unrelated reasons.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8751: MINOR: Update to Gradle 6.5 and tweak build jvm config

2020-06-03 Thread GitBox


ijuma commented on pull request #8751:
URL: https://github.com/apache/kafka/pull/8751#issuecomment-638438813


   Build passed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #8790: MINOR: Upgrade spotbugs and spotbugsPlugin

2020-06-03 Thread GitBox


ijuma merged pull request #8790:
URL: https://github.com/apache/kafka/pull/8790


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #8751: MINOR: Update to Gradle 6.5 and tweak build jvm config

2020-06-03 Thread GitBox


ijuma merged pull request #8751:
URL: https://github.com/apache/kafka/pull/8751


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8751: MINOR: Update to Gradle 6.5 and tweak build jvm config

2020-06-03 Thread GitBox


ijuma commented on pull request #8751:
URL: https://github.com/apache/kafka/pull/8751#issuecomment-638439407


   Cherry picked this to 2.6 as well since it's low risk and fixes a 
compilation speed regression.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9313) Set default for client.dns.lookup to use_all_dns_ips

2020-06-03 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-9313:
---
Fix Version/s: 2.6.0

> Set default for client.dns.lookup to use_all_dns_ips
> 
>
> Key: KAFKA-9313
> URL: https://issues.apache.org/jira/browse/KAFKA-9313
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Yeva Byzek
>Assignee: Badai Aqrandista
>Priority: Minor
> Fix For: 2.6.0
>
>
> The default setting of the configuration parameter {{client.dns.lookup}} is 
> *not* {{use_all_dns_ips}} .  Consequently, by default, if there are multiple 
> IP addresses and the first one fails, the connection will fail.
>  
> It is desirable to change the default to be 
> {{client.dns.lookup=use_all_dns_ips}} for two reasons:
>  # reduce connection failure rates by 
>  # users are often surprised that this is not already the default
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9320) Enable TLSv1.3 by default and disable some of the older protocols

2020-06-03 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-9320:
---
Fix Version/s: 2.6.0

> Enable TLSv1.3 by default and disable some of the older protocols
> -
>
> Key: KAFKA-9320
> URL: https://issues.apache.org/jira/browse/KAFKA-9320
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.6.0
>
> Attachments: report.txt
>
>
> KAFKA-7251 added support for TLSv1.3. We should include this in the list of 
> protocols that are enabled by default. We should also disable some of the 
> older protocols that are not secure. This change requires a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10091) Improve task idling

2020-06-03 Thread John Roesler (Jira)
John Roesler created KAFKA-10091:


 Summary: Improve task idling
 Key: KAFKA-10091
 URL: https://issues.apache.org/jira/browse/KAFKA-10091
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: John Roesler


When Streams is processing a task with multiple inputs, each time it is ready 
to process a record, it has to choose which input to process next. It always 
takes from the input for which the next record has the least timestamp. The 
result of this is that Streams processes data in timestamp order. However, if 
the buffer for one of the inputs is empty, Streams doesn't know what timestamp 
the next record for that input will be.

Streams introduced a configuration "max.task.idle.ms" in KIP-353 to address 
this issue.

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization]

The config allows Streams to wait some amount of time for data to arrive on the 
empty input, so that it can make a timestamp-ordered decision about which input 
to pull from next.

However, this config can be hard to use reliably and efficiently, since what 
we're really waiting for is the next poll that _would_ return data from the 
empty input's partition, and this guarantee is a function of the poll interval, 
the max poll interval, and the internal logic that governs when Streams will 
poll again.

The ideal case is you'd be able to guarantee at a minimum that _any_ amount of 
idling would guarantee you poll data from the empty partition if there's data 
to fetch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-06-03 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-638444641


   @dajac Thank for your interest. I want to merge 
https://github.com/apache/kafka/pull/8311 and 
https://github.com/apache/kafka/pull/8312 first. I'll take a look at them 
tomorrow



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10095) In LogCleanerManagerTest replace get().nonEmpty call with contains

2020-06-03 Thread Jakob Homan (Jira)
Jakob Homan created KAFKA-10095:
---

 Summary: In LogCleanerManagerTest replace get().nonEmpty call with 
contains
 Key: KAFKA-10095
 URL: https://issues.apache.org/jira/browse/KAFKA-10095
 Project: Kafka
  Issue Type: Improvement
  Components: log cleaner, unit tests
Reporter: Jakob Homan
Assignee: Sarah Gonsalves


n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

In kafka.log.LogCleanerManagerTest we have two calls to 
.get(something).nonEmpty, which is equivalent to .contains(something).  We 
should simplify these calls.

 {code}cleanerManager.doneCleaning(topicPartition, log.dir, 1)
assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)

assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)

cleanerManager.setCleaningState(topicPartition, LogCleaningAborted)
cleanerManager.doneCleaning(topicPartition, log.dir, 1)
assertEquals(LogCleaningPaused(1), 
cleanerManager.cleaningState(topicPartition).get)

assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10093) Replace iteration with call to addAll in Utils

2020-06-03 Thread Jakob Homan (Jira)
Jakob Homan created KAFKA-10093:
---

 Summary: Replace iteration with call to addAll in Utils
 Key: KAFKA-10093
 URL: https://issues.apache.org/jira/browse/KAFKA-10093
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Jakob Homan
Assignee: Can Cecen


n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

In clients/src/main/java/org/apache/kafka/common/utils/Utils.java we're 
currently using iteration to add all the elements from one collection into 
another. We can replace this with a call to Arrays.asList() and 
Collections.addAll().

{code}/**
 * Creates a set
 * @param elems the elements
 * @param  the type of element
 * @return Set
 */
 @SafeVarargs
 public static  Set mkSet(T... elems) {
 Set result = new HashSet<>((int) (elems.length / 0.75) + 1);
 for (T elem : elems)
 result.add(elem);
 return result;
 }

/**
 * Creates a sorted set
 * @param elems the elements
 * @param  the type of element, must be comparable
 * @return SortedSet
 */
 @SafeVarargs
 public static > SortedSet mkSortedSet(T... elems) {
 SortedSet result = new TreeSet<>();
 for (T elem : elems)
 result.add(elem);
 return result;
 }{code}

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10094) In MirrorSourceConnector replace two-step assignment with single call

2020-06-03 Thread Jakob Homan (Jira)
Jakob Homan created KAFKA-10094:
---

 Summary: In MirrorSourceConnector replace two-step assignment with 
single call
 Key: KAFKA-10094
 URL: https://issues.apache.org/jira/browse/KAFKA-10094
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Jakob Homan
Assignee: Mandar Tillu


n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

 

In MirrorSourceConnector::refreshTopicPartitions we have places where we create 
a new HashSet and then addAll to the set.  We can replace both with a direct 
call to the copy constructor.

 
{code:java}
void refreshTopicPartitions()
throws InterruptedException, ExecutionException {
knownSourceTopicPartitions = findSourceTopicPartitions();
knownTargetTopicPartitions = findTargetTopicPartitions();
List upstreamTargetTopicPartitions = 
knownTargetTopicPartitions.stream()
.map(x -> new 
TopicPartition(replicationPolicy.upstreamTopic(x.topic()), x.partition()))
.collect(Collectors.toList());

Set newTopicPartitions = new HashSet<>();
newTopicPartitions.addAll(knownSourceTopicPartitions);
newTopicPartitions.removeAll(upstreamTargetTopicPartitions);
Set deadTopicPartitions = new HashSet<>();
deadTopicPartitions.addAll(upstreamTargetTopicPartitions);{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10092) Remove unnecessary enum modifier in NioEchoServer

2020-06-03 Thread Jakob Homan (Jira)
Jakob Homan created KAFKA-10092:
---

 Summary: Remove unnecessary enum modifier in NioEchoServer
 Key: KAFKA-10092
 URL: https://issues.apache.org/jira/browse/KAFKA-10092
 Project: Kafka
  Issue Type: Improvement
Reporter: Jakob Homan
Assignee: Andrey Falko


n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.  

In NioEchoServer the enum has its constructor declared as private, which is 
[redundant|https://docs.oracle.com/javase/tutorial/java/javaOO/enum.html].  We 
can remove this.  

{code}public class NioEchoServer extends Thread {
public enum MetricType {
TOTAL, RATE, AVG, MAX;

private final String metricNameSuffix;

private MetricType() {
metricNameSuffix = "-" + name().toLowerCase(Locale.ROOT);
}}} {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8786: KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests

2020-06-03 Thread GitBox


guozhangwang commented on pull request #8786:
URL: https://github.com/apache/kafka/pull/8786#issuecomment-638463385


   Cherry-picked back to 2.4



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10093) Replace iteration with call to addAll in Utils

2020-06-03 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17125330#comment-17125330
 ] 

Ismael Juma commented on KAFKA-10093:
-

Thanks for the JIRA. Using `addAll` here is not particularly beneficial since 
the HashSet is sized correctly in the constructor. And it requires the 
allocation of `Arrays.asList`.

> Replace iteration with call to addAll in Utils
> --
>
> Key: KAFKA-10093
> URL: https://issues.apache.org/jira/browse/KAFKA-10093
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Jakob Homan
>Assignee: Can Cecen
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> In clients/src/main/java/org/apache/kafka/common/utils/Utils.java we're 
> currently using iteration to add all the elements from one collection into 
> another. We can replace this with a call to Arrays.asList() and 
> Collections.addAll().
> {code}/**
>  * Creates a set
>  * @param elems the elements
>  * @param  the type of element
>  * @return Set
>  */
>  @SafeVarargs
>  public static  Set mkSet(T... elems) {
>  Set result = new HashSet<>((int) (elems.length / 0.75) + 1);
>  for (T elem : elems)
>  result.add(elem);
>  return result;
>  }
> /**
>  * Creates a sorted set
>  * @param elems the elements
>  * @param  the type of element, must be comparable
>  * @return SortedSet
>  */
>  @SafeVarargs
>  public static > SortedSet mkSortedSet(T... elems) 
> {
>  SortedSet result = new TreeSet<>();
>  for (T elem : elems)
>  result.add(elem);
>  return result;
>  }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] afalko opened a new pull request #8794: KAFKA-10092: Remove unused code branches in NioEchoServer

2020-06-03 Thread GitBox


afalko opened a new pull request #8794:
URL: https://github.com/apache/kafka/pull/8794


   NioEchoServer the enum has its constructor declared as private, which is 
redundant. There is also an unused exception throws and unused method.
   
   Reviewers: Jakob Homan 
   
   This is a newbie ticket to get used to the contribution flow.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] afalko commented on pull request #8794: KAFKA-10092: Remove unused code branches in NioEchoServer

2020-06-03 Thread GitBox


afalko commented on pull request #8794:
URL: https://github.com/apache/kafka/pull/8794#issuecomment-638478890


   @jghoman 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10096) Remove unnecessary String.format call in VersionConditionalTest.java

2020-06-03 Thread Can Cecen (Jira)
Can Cecen created KAFKA-10096:
-

 Summary: Remove unnecessary String.format call in 
VersionConditionalTest.java
 Key: KAFKA-10096
 URL: https://issues.apache.org/jira/browse/KAFKA-10096
 Project: Kafka
  Issue Type: Improvement
  Components: unit tests
Reporter: Can Cecen


n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.
static void assertEquals(CodeBuffer buffer, String... lines) throws Exception {
StringWriter stringWriter = new StringWriter();
buffer.write(stringWriter);
StringBuilder expectedStringBuilder = new StringBuilder();
for (String line : lines) {
expectedStringBuilder.append(String.format(line));
}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10096) Remove unnecessary String.format call in VersionConditionalTest.java

2020-06-03 Thread Jakob Homan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakob Homan reassigned KAFKA-10096:
---

Assignee: Can Cecen

> Remove unnecessary String.format call in VersionConditionalTest.java
> 
>
> Key: KAFKA-10096
> URL: https://issues.apache.org/jira/browse/KAFKA-10096
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Can Cecen
>Assignee: Can Cecen
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> Since there is no format specified to String.format, we can remove it and 
> just append the line.
> static void assertEquals(CodeBuffer buffer, String... lines) throws Exception 
> {
> StringWriter stringWriter = new StringWriter();
> buffer.write(stringWriter);
> StringBuilder expectedStringBuilder = new StringBuilder();
> for (String line : lines) {
> expectedStringBuilder.append(String.format(line));
> }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] sarahgonsalves223 opened a new pull request #8795: KAFKA-10095: Simplify calls in LogCleanerManagerTest

2020-06-03 Thread GitBox


sarahgonsalves223 opened a new pull request #8795:
URL: https://github.com/apache/kafka/pull/8795


   In kafka.log.LogCleanerManagerTest we have two calls to 
.get(something).nonEmpty, which is equivalent to .contains(something). Making 
changes to simplify these calls.
   
   Reviewers: Jakob Homan jgho...@gmail.com
   
   This is a newbie ticket to get used to the contribution flow.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sarahgonsalves223 commented on pull request #8795: KAFKA-10095: Simplify calls in LogCleanerManagerTest

2020-06-03 Thread GitBox


sarahgonsalves223 commented on pull request #8795:
URL: https://github.com/apache/kafka/pull/8795#issuecomment-638481270


   @jghoman



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10096) Remove unnecessary String.format call in VersionConditionalTest.java

2020-06-03 Thread Can Cecen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Can Cecen updated KAFKA-10096:
--
Description: 
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.
```
 static void assertEquals(CodeBuffer buffer, String... lines) throws Exception {
 StringWriter stringWriter = new StringWriter();
 buffer.write(stringWriter);
 StringBuilder expectedStringBuilder = new StringBuilder();
 for (String line : lines)

{ expectedStringBuilder.append(String.format(line)); }

```

  was:
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.
static void assertEquals(CodeBuffer buffer, String... lines) throws Exception {
StringWriter stringWriter = new StringWriter();
buffer.write(stringWriter);
StringBuilder expectedStringBuilder = new StringBuilder();
for (String line : lines) {
expectedStringBuilder.append(String.format(line));
}


> Remove unnecessary String.format call in VersionConditionalTest.java
> 
>
> Key: KAFKA-10096
> URL: https://issues.apache.org/jira/browse/KAFKA-10096
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Can Cecen
>Assignee: Can Cecen
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> Since there is no format specified to String.format, we can remove it and 
> just append the line.
> ```
>  static void assertEquals(CodeBuffer buffer, String... lines) throws 
> Exception {
>  StringWriter stringWriter = new StringWriter();
>  buffer.write(stringWriter);
>  StringBuilder expectedStringBuilder = new StringBuilder();
>  for (String line : lines)
> { expectedStringBuilder.append(String.format(line)); }
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10096) Remove unnecessary String.format call in VersionConditionalTest.java

2020-06-03 Thread Can Cecen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Can Cecen updated KAFKA-10096:
--
Description: 
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.

 ```
static void assertEquals(CodeBuffer buffer, String... lines) throws 
Exception {
StringWriter stringWriter = new StringWriter();
buffer.write(stringWriter);
StringBuilder expectedStringBuilder = new StringBuilder();
for (String line : lines) {
expectedStringBuilder.append(String.format(line));
}
   }
```

  was:
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.
 ```
static void assertEquals(CodeBuffer buffer, String... lines) throws 
Exception {
StringWriter stringWriter = new StringWriter();
buffer.write(stringWriter);
StringBuilder expectedStringBuilder = new StringBuilder();
for (String line : lines) {
expectedStringBuilder.append(String.format(line));
}
```


> Remove unnecessary String.format call in VersionConditionalTest.java
> 
>
> Key: KAFKA-10096
> URL: https://issues.apache.org/jira/browse/KAFKA-10096
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Can Cecen
>Assignee: Can Cecen
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> Since there is no format specified to String.format, we can remove it and 
> just append the line.
>  ```
> static void assertEquals(CodeBuffer buffer, String... lines) throws 
> Exception {
> StringWriter stringWriter = new StringWriter();
> buffer.write(stringWriter);
> StringBuilder expectedStringBuilder = new StringBuilder();
> for (String line : lines) {
> expectedStringBuilder.append(String.format(line));
> }
>}
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10096) Remove unnecessary String.format call in VersionConditionalTest.java

2020-06-03 Thread Can Cecen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Can Cecen updated KAFKA-10096:
--
Description: 
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.
 ```
static void assertEquals(CodeBuffer buffer, String... lines) throws 
Exception {
StringWriter stringWriter = new StringWriter();
buffer.write(stringWriter);
StringBuilder expectedStringBuilder = new StringBuilder();
for (String line : lines) {
expectedStringBuilder.append(String.format(line));
}
```

  was:
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.
```
 static void assertEquals(CodeBuffer buffer, String... lines) throws Exception {
 StringWriter stringWriter = new StringWriter();
 buffer.write(stringWriter);
 StringBuilder expectedStringBuilder = new StringBuilder();
 for (String line : lines)

{ expectedStringBuilder.append(String.format(line)); }

```


> Remove unnecessary String.format call in VersionConditionalTest.java
> 
>
> Key: KAFKA-10096
> URL: https://issues.apache.org/jira/browse/KAFKA-10096
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Can Cecen
>Assignee: Can Cecen
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> Since there is no format specified to String.format, we can remove it and 
> just append the line.
>  ```
> static void assertEquals(CodeBuffer buffer, String... lines) throws 
> Exception {
> StringWriter stringWriter = new StringWriter();
> buffer.write(stringWriter);
> StringBuilder expectedStringBuilder = new StringBuilder();
> for (String line : lines) {
> expectedStringBuilder.append(String.format(line));
> }
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10096) Remove unnecessary String.format call in VersionConditionalTest.java

2020-06-03 Thread Can Cecen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Can Cecen updated KAFKA-10096:
--
Description: 
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.

 
{{static void assertEquals(CodeBuffer buffer, String... lines) throws 
Exception {
StringWriter stringWriter = new StringWriter();
buffer.write(stringWriter);
StringBuilder expectedStringBuilder = new StringBuilder();
for (String line : lines) {
expectedStringBuilder.append(String.format(line));
}
   }}}


  was:
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.

 ```
static void assertEquals(CodeBuffer buffer, String... lines) throws 
Exception {
StringWriter stringWriter = new StringWriter();
buffer.write(stringWriter);
StringBuilder expectedStringBuilder = new StringBuilder();
for (String line : lines) {
expectedStringBuilder.append(String.format(line));
}
   }
```


> Remove unnecessary String.format call in VersionConditionalTest.java
> 
>
> Key: KAFKA-10096
> URL: https://issues.apache.org/jira/browse/KAFKA-10096
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Can Cecen
>Assignee: Can Cecen
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> Since there is no format specified to String.format, we can remove it and 
> just append the line.
>  
> {{static void assertEquals(CodeBuffer buffer, String... lines) throws 
> Exception {
> StringWriter stringWriter = new StringWriter();
> buffer.write(stringWriter);
> StringBuilder expectedStringBuilder = new StringBuilder();
> for (String line : lines) {
> expectedStringBuilder.append(String.format(line));
> }
>}}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10096) Remove unnecessary String.format call in VersionConditionalTest.java

2020-06-03 Thread Can Cecen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Can Cecen updated KAFKA-10096:
--
Description: 
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.

 {code}
static void assertEquals(CodeBuffer buffer, String... lines) throws Exception {
StringWriter stringWriter = new StringWriter();
buffer.write(stringWriter);
StringBuilder expectedStringBuilder = new StringBuilder();
for (String line : lines) {
expectedStringBuilder.append(String.format(line));
}
}
{code}

  was:
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

Since there is no format specified to String.format, we can remove it and just 
append the line.

 
{{static void assertEquals(CodeBuffer buffer, String... lines) throws 
Exception {
StringWriter stringWriter = new StringWriter();
buffer.write(stringWriter);
StringBuilder expectedStringBuilder = new StringBuilder();
for (String line : lines) {
expectedStringBuilder.append(String.format(line));
}
   }}}



> Remove unnecessary String.format call in VersionConditionalTest.java
> 
>
> Key: KAFKA-10096
> URL: https://issues.apache.org/jira/browse/KAFKA-10096
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Can Cecen
>Assignee: Can Cecen
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> Since there is no format specified to String.format, we can remove it and 
> just append the line.
>  {code}
> static void assertEquals(CodeBuffer buffer, String... lines) throws Exception 
> {
> StringWriter stringWriter = new StringWriter();
> buffer.write(stringWriter);
> StringBuilder expectedStringBuilder = new StringBuilder();
> for (String line : lines) {
> expectedStringBuilder.append(String.format(line));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cancecen opened a new pull request #8796: KAFKA-10096: Remove unnecessary String.format call.

2020-06-03 Thread GitBox


cancecen opened a new pull request #8796:
URL: https://github.com/apache/kafka/pull/8796


   https://issues.apache.org/jira/browse/KAFKA-10096
   
   Since format is not provided in VersionConditionalTest::assertEquals, we can 
just pass the string into StringBuilder::append.
   
   This is a newbie ticket to get used to the contribution flow.
   
   Reviewers: @jghoman 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mtillu opened a new pull request #8797: Fixing KAFKA-10094

2020-06-03 Thread GitBox


mtillu opened a new pull request #8797:
URL: https://github.com/apache/kafka/pull/8797


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mtillu commented on pull request #8797: Fixing KAFKA-10094

2020-06-03 Thread GitBox


mtillu commented on pull request #8797:
URL: https://github.com/apache/kafka/pull/8797#issuecomment-638485010


   PTAL @jghoman 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10096) Remove unnecessary String.format call in VersionConditionalTest.java

2020-06-03 Thread Can Cecen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Can Cecen resolved KAFKA-10096.
---
Resolution: Won't Fix

Realized that String.format is used to create line separators. 

> Remove unnecessary String.format call in VersionConditionalTest.java
> 
>
> Key: KAFKA-10096
> URL: https://issues.apache.org/jira/browse/KAFKA-10096
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Can Cecen
>Assignee: Can Cecen
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> Since there is no format specified to String.format, we can remove it and 
> just append the line.
>  {code}
> static void assertEquals(CodeBuffer buffer, String... lines) throws Exception 
> {
> StringWriter stringWriter = new StringWriter();
> buffer.write(stringWriter);
> StringBuilder expectedStringBuilder = new StringBuilder();
> for (String line : lines) {
> expectedStringBuilder.append(String.format(line));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cancecen closed pull request #8796: KAFKA-10096: Remove unnecessary String.format call.

2020-06-03 Thread GitBox


cancecen closed pull request #8796:
URL: https://github.com/apache/kafka/pull/8796


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8784: KAFKA-9788: Use distinct names for transaction and group load time se…

2020-06-03 Thread GitBox


hachikuji commented on pull request #8784:
URL: https://github.com/apache/kafka/pull/8784#issuecomment-638492720


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9161) Close gaps in Streams configs documentation

2020-06-03 Thread Jakob Homan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakob Homan resolved KAFKA-9161.

Resolution: Fixed

> Close gaps in Streams configs documentation
> ---
>
> Key: KAFKA-9161
> URL: https://issues.apache.org/jira/browse/KAFKA-9161
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: beginner, newbie, newbie++
>
> There are a number of Streams configs that aren't documented in the 
> "Configuring a Streams Application" section of the docs. As of 2.3 the 
> missing configs are:
>  # default.windowed.key.serde.inner ^
>  # default.windowed.value.serde.inner ^
>  # max.task.idle.ms
>  # rocksdb.config.setter. ^^
>  # topology.optimization
>  # -upgrade.from- fixed
>  ^ these configs are also missing the corresponding DOC string
>  ^^ this one actually does appear on that page, but instead of being included 
> in the list of Streams configs it is for some reason under  "Consumer and 
> Producer Configuration Parameters" ?
> There are also a few configs whose documented name is slightly incorrect, as 
> it is missing the "default" prefix that appears in the actual code. The 
> "missing-default" configs are:
>  # key.serde
>  # value.serde
>  # timestamp.extractor



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-03 Thread GitBox


guozhangwang commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434892254



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -493,28 +549,45 @@ public void closeAndRecycleState() {
 private Map prepareClose(final boolean clean) {
 final Map checkpoint;
 
-if (state() == State.CREATED) {
-// the task is created and not initialized, just re-write the 
checkpoint file
-checkpoint = Collections.emptyMap();
-} else if (state() == State.RUNNING) {
-closeTopology(clean);
+switch (state()) {
+case CREATED:
+// the task is created and not initialized, just re-write the 
checkpoint file
+checkpoint = Collections.emptyMap();
 
-if (clean) {
-stateMgr.flush();
-recordCollector.flush();
-checkpoint = checkpointableOffsets();
-} else {
+break;
+
+case RUNNING:
+closeTopology(clean);
+
+if (clean) {
+stateMgr.flush();
+recordCollector.flush();
+checkpoint = checkpointableOffsets();
+} else {
+checkpoint = null; // `null` indicates to not write a 
checkpoint
+executeAndMaybeSwallow(false, stateMgr::flush, "state 
manager flush", log);
+}
+
+break;
+
+case RESTORING:
+executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
+checkpoint = Collections.emptyMap();
+
+break;
+
+case SUSPENDED:
+// if `SUSPENDED` do not need to checkpoint, since when 
suspending we've already committed the state
 checkpoint = null; // `null` indicates to not write a 
checkpoint
-executeAndMaybeSwallow(false, stateMgr::flush, "state manager 
flush", log);
-}
-} else if (state() == State.RESTORING) {
-executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
-checkpoint = Collections.emptyMap();
-} else if (state() == State.SUSPENDED) {
-// if `SUSPENDED` do not need to checkpoint, since when suspending 
we've already committed the state
-checkpoint = null; // `null` indicates to not write a checkpoint
-} else {
-throw new IllegalStateException("Illegal state " + state() + " 
while prepare closing active task " + id);
+
+break;
+case CLOSED:
+checkpoint = Collections.emptyMap();

Review comment:
   I think null and emptyMap has different semantics: the former indicates 
do not try to override the checkpoint file, while the latter indicates “just 
writing the checkpoint file as of the current state store maintained offset” 
I.e. in stateMgr.checkpoint(writtenOffsets)  if the map is empty, we would 
still write the checkpoint file but just based on each store’s current 
storeMetadata.offset.
   
   So back to prepareClose: if we are in CREATED, meaning we’ve read the 
checkpoint file into the store, we still need to write that loaded offsets back 
to the file; in SUSPENDED we know we’ve written the offset to the checkpoint 
file already when transiting to that state, so we can return null to indicate 
no need to write again.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-03 Thread GitBox


vvcephei commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r434892857



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
##
@@ -300,20 +302,71 @@ public void 
shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapac
 @Test
 public void shouldAddTasksWithLatestOffsetToPrevActiveTasks() {
 final Map taskOffsetSums = 
Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET);
-client.addPreviousTasksAndOffsetSums(taskOffsetSums);
+client.addPreviousTasksAndOffsetSums("c1", taskOffsetSums);
 client.initializePrevTasks(Collections.emptyMap());
 assertThat(client.prevActiveTasks(), 
equalTo(Collections.singleton(TASK_0_1)));
 assertThat(client.previousAssignedTasks(), 
equalTo(Collections.singleton(TASK_0_1)));
 assertTrue(client.prevStandbyTasks().isEmpty());
 }
 
+@Test
+public void shouldReturnPreviousStatefulTasksForConsumer() {
+client.addPreviousTasksAndOffsetSums("c1", 
Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET));
+client.addPreviousTasksAndOffsetSums("c2", 
Collections.singletonMap(TASK_0_2, 0L));
+client.addPreviousTasksAndOffsetSums("c3", Collections.emptyMap());
+
+client.initializePrevTasks(Collections.emptyMap());
+client.computeTaskLags(
+UUID_1,
+mkMap(
+mkEntry(TASK_0_1, 1_000L),
+mkEntry(TASK_0_2, 1_000L)
+)
+);
+
+assertThat(client.previousTasksForConsumer("c1"), 
equalTo(mkSortedSet(TASK_0_1)));
+assertThat(client.previousTasksForConsumer("c2"), 
equalTo(mkSortedSet(TASK_0_2)));
+assertTrue(client.previousTasksForConsumer("c3").isEmpty());
+}
+
+@Test
+public void 
shouldReturnPreviousStatefulTasksForConsumerWhenLagIsNotComputed() {
+client.addPreviousTasksAndOffsetSums("c1", 
Collections.singletonMap(TASK_0_1, 1000L));
+client.initializePrevTasks(Collections.emptyMap());
+
+assertThat(client.previousTasksForConsumer("c1"), 
equalTo(mkSortedSet(TASK_0_1)));
+}
+
+@Test
+public void 
shouldReturnPreviousStatefulTasksForConsumerInIncreasingLagOrder() {

Review comment:
   I missed the extra sort on my last review. It really seems like too much 
fanciness for the ClientState to sort the tasks in lag order. Would it be too 
messy to move the sort aspect out to the balancing code that needs it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-03 Thread GitBox


vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-638494568







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10097) Avoid getting null map for task checkpoint

2020-06-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10097:
---

 Summary: Avoid getting null map for task checkpoint
 Key: KAFKA-10097
 URL: https://issues.apache.org/jira/browse/KAFKA-10097
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


In StreamTask, we have the logic to generate a checkpoint offset map to be 
materialized through StateManager#checkpoint. This map could be either empty 
map or null, which the former case indicates to only pull down existing state 
store checkpoint data, while the latter indicates no need to do a checkpoint in 
the case such as we are suspending a task.

Having two similar special logics for checkpointing could lead to unexpected 
bugs, also we should think about separating the empty checkpoint case vs 
passed-in checkpoint case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10097) Avoid getting null map for task checkpoint

2020-06-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10097:

Component/s: streams

> Avoid getting null map for task checkpoint
> --
>
> Key: KAFKA-10097
> URL: https://issues.apache.org/jira/browse/KAFKA-10097
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> In StreamTask, we have the logic to generate a checkpoint offset map to be 
> materialized through StateManager#checkpoint. This map could be either empty 
> map or null, which the former case indicates to only pull down existing state 
> store checkpoint data, while the latter indicates no need to do a checkpoint 
> in the case such as we are suspending a task.
> Having two similar special logics for checkpointing could lead to unexpected 
> bugs, also we should think about separating the empty checkpoint case vs 
> passed-in checkpoint case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10098) Remove unnecessary escaping in regular expression in SaslAuthenticatorTest.java

2020-06-03 Thread Can Cecen (Jira)
Can Cecen created KAFKA-10098:
-

 Summary: Remove unnecessary escaping in regular expression in 
SaslAuthenticatorTest.java
 Key: KAFKA-10098
 URL: https://issues.apache.org/jira/browse/KAFKA-10098
 Project: Kafka
  Issue Type: Improvement
Reporter: Can Cecen


n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

In line:
{code}
e.getMessage().matches(
".*\\<\\[" + expectedResponseTextRegex + 
"]>.*\\<\\[" + receivedResponseTextRegex + ".*?]>"));
{code}
'<' does not need to be escaped.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10098) Remove unnecessary escaping in regular expression in SaslAuthenticatorTest.java

2020-06-03 Thread Can Cecen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Can Cecen updated KAFKA-10098:
--
Description: 
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

In line:
{code}
e.getMessage().matches(
".*\\<\\[" + expectedResponseTextRegex + 
"]>.*\\<\\[" + receivedResponseTextRegex + ".*?]>"));
{code}
'<' or '>' does not need to be escaped.



  was:
n.b. This is a newbie ticket designed to be an introduction to contributing for 
the assignee.

In line:
{code}
e.getMessage().matches(
".*\\<\\[" + expectedResponseTextRegex + 
"]>.*\\<\\[" + receivedResponseTextRegex + ".*?]>"));
{code}
'<' does not need to be escaped.




> Remove unnecessary escaping in regular expression in 
> SaslAuthenticatorTest.java
> ---
>
> Key: KAFKA-10098
> URL: https://issues.apache.org/jira/browse/KAFKA-10098
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Can Cecen
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> In line:
> {code}
> e.getMessage().matches(
> ".*\\<\\[" + expectedResponseTextRegex + 
> "]>.*\\<\\[" + receivedResponseTextRegex + ".*?]>"));
> {code}
> '<' or '>' does not need to be escaped.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cancecen opened a new pull request #8798: KAFKA-10098: Remove unnecessary escaping in regular expression.

2020-06-03 Thread GitBox


cancecen opened a new pull request #8798:
URL: https://github.com/apache/kafka/pull/8798


   '<' or '>' do not need to be escaped.
   
   This is a newbie PR to learn the contribution flow.
   
   Reviewer: @jghoman 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10098) Remove unnecessary escaping in regular expression in SaslAuthenticatorTest.java

2020-06-03 Thread Can Cecen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Can Cecen reassigned KAFKA-10098:
-

Assignee: Can Cecen

> Remove unnecessary escaping in regular expression in 
> SaslAuthenticatorTest.java
> ---
>
> Key: KAFKA-10098
> URL: https://issues.apache.org/jira/browse/KAFKA-10098
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Can Cecen
>Assignee: Can Cecen
>Priority: Trivial
>  Labels: newbie
>
> n.b. This is a newbie ticket designed to be an introduction to contributing 
> for the assignee.
> In line:
> {code}
> e.getMessage().matches(
> ".*\\<\\[" + expectedResponseTextRegex + 
> "]>.*\\<\\[" + receivedResponseTextRegex + ".*?]>"));
> {code}
> '<' or '>' does not need to be escaped.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers

2020-06-03 Thread GitBox


mjsax commented on pull request #8771:
URL: https://github.com/apache/kafka/pull/8771#issuecomment-638511478


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8759: KAFKA-10066: TestOutputTopic should pass record headers into deserializers

2020-06-03 Thread GitBox


mjsax commented on pull request #8759:
URL: https://github.com/apache/kafka/pull/8759#issuecomment-638512007


   Java 11:
   ```
   kafka.api.TransactionsBounceTest.testWithGroupMetadata
   
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   
org.apache.kafka.clients.consumer.StickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   
org.apache.kafka.clients.consumer.StickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   ```
   Java 14:
   ```
   
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   
org.apache.kafka.clients.consumer.StickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   ```
   Java 8:
   ```
   
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   
org.apache.kafka.clients.consumer.StickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   
org.apache.kafka.clients.consumer.StickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-06-03 Thread GitBox


hachikuji commented on pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#issuecomment-638511962


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >