[GitHub] [kafka] chia7712 opened a new pull request #9981: MINOR: Upgrade to Scala 2.12.13

2021-01-27 Thread GitBox


chia7712 opened a new pull request #9981:
URL: https://github.com/apache/kafka/pull/9981


   this is follow-up of #9643
   
   ### Scala 2.12.13 release page
   
   https://github.com/scala/scala/releases/tag/v2.12.13
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] twobeeb commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


twobeeb commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565109219



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) {
 public List clusterPairs() {
 List pairs = new ArrayList<>();
 Set clusters = clusters();
+Map originalStrings = originalsStrings();
+boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
 for (String source : clusters) {
 for (String target : clusters) {
-SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
 if (!source.equals(target)) {
-pairs.add(sourceAndTarget);
+String clusterPairConfigPrefix = source + "->" + target + 
".";
+boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
+if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
   That's actually the current behavior, all herders are created and the 
beats are emitted from the "opposite" herder:  
   
   If you have a replication flow from A to B **and you want heartbeats**, you 
need 2 herders :  
   - A->B for the MirrorSourceConnector 
   - B->A for the MirrorHeartbeatConnector 
   
   The MirrorHeartbeatConnector on B->A emits beats into topic heartbeats on 
cluster A.
   The MirrorSourceConnector on A->B then replicated whichever topic is 
configured as well as heartbeats.
   
   All of this is fine with 2-3 clusters. It starts to make less sense when 
using 10+ clusters.
   
   Because this is the current implementation, we decided to keep it as-is so 
as to not cause change in behavior, and add a new top-level property 
"emit.heartbeats.enabled" which defaults to "true".
   
   Existing users will not see any change and if they depend on these 
"opposites" herders for their monitoring, it will still work.  
   New users with more complex use case can change this property and fine tune 
their heartbeat generation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] twobeeb commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


twobeeb commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565109219



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) {
 public List clusterPairs() {
 List pairs = new ArrayList<>();
 Set clusters = clusters();
+Map originalStrings = originalsStrings();
+boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
 for (String source : clusters) {
 for (String target : clusters) {
-SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
 if (!source.equals(target)) {
-pairs.add(sourceAndTarget);
+String clusterPairConfigPrefix = source + "->" + target + 
".";
+boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
+if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
   That's actually the current behavior, all herders are created and the 
beats are emitted from the "opposite" herder  
   
   If you have a replication flow from A to B **and you want heartbeats**, you 
need 2 herders :  
   - A->B for the MirrorSourceConnector 
   - B->A for the MirrorHeartbeatConnector 
   
   The MirrorHeartbeatConnector on B->A emits beats into topic heartbeats on 
cluster A.
   The MirrorSourceConnector on A->B then replicates whichever topic is 
configured as well as heartbeats.
   
   All of this is fine with 2-3 clusters. It starts to make less sense when 
using 10+ clusters.
   
   Because this is the current implementation, we decided to keep it as-is so 
as to not cause change in behavior, and add a new top-level property 
"emit.heartbeats.enabled" which defaults to "true".
   
   Existing users will not see any change and if they depend on these 
"opposites" herders for their monitoring, it will still work.  
   New users with more complex use case can change this property and fine tune 
their heartbeat generation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override

2021-01-27 Thread GitBox


dajac commented on a change in pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#discussion_r565088240



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -33,49 +34,65 @@ object GetOffsetShell {
 
   def main(args: Array[String]): Unit = {
 val parser = new OptionParser(false)
-val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of 
hostname and port of the server to connect to.")
+val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
-   .describedAs("hostname:port,...,hostname:port")
+   .describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(classOf[String])
-val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset 
from.")
+val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The 
server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
+   .requiredUnless("broker-list")
+   .withRequiredArg
+   .describedAs("HOST1:PORT1,...,HOST3:PORT3")
+   .ofType(classOf[String])
+val topicPartitionsOpt = parser.accepts("topic-partitions", "Comma 
separated list of topic-partition specifications to get the offsets for, with 
the format of topic:partition. The 'topic' part can be a regex or may be 
omitted to only specify the partitions, and query all authorized topics." +
+" The 'partition' part can be: a 
number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper 
exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive 
upper bound in the format of '-NUMBER' or may be omitted to accept all 
partitions of the specified topic.")
+   .withRequiredArg
+   .describedAs("topic:partition,...,topic:partition")
+   .ofType(classOf[String])
+val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. 
It also accepts a regular expression. If not present, all authorized topics are 
queried. Cannot be used if --topic-partitions is present.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
-val partitionOpt = parser.accepts("partitions", "comma separated list of 
partition ids. If not specified, it will find offsets for all partitions")
+val partitionsOpt = parser.accepts("partitions", s"Comma separated list of 
partition ids to get the offsets for. If not present, all partitions of the 
authorized topics are queried. Cannot be used if --topic-partitions is 
present.")
.withRequiredArg
.describedAs("partition ids")
.ofType(classOf[String])
-   .defaultsTo("")
-val timeOpt = parser.accepts("time", "timestamp of the offsets before 
that. [Note: No offset is returned, if the timestamp greater than recently 
commited record timestamp is given.]")
+val timeOpt = parser.accepts("time", "timestamp of the offsets before 
that. [Note: No offset is returned, if the timestamp greater than recently 
committed record timestamp is given.]")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1L)
-parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets 
returned")
-   .withRequiredArg
-   .describedAs("count")
-   .ofType(classOf[java.lang.Integer])
-   .defaultsTo(1)
-parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of 
time each fetch request waits.")
+val commandConfigOpt = parser.accepts("command-config", s"Property file 
containing configs to be passed to Consumer Client.")
.withRequiredArg
-   .describedAs("ms")
-   .ofType(classOf[java.lang.Integer])
-   .defaultsTo(1000)
+   .describedAs("config file")
+   .ofType(classOf[String])
+val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", 
s"By default, internal topics are included. If specified, internal topics are 
excluded.")
 
-   if (args.length == 0)
-  CommandLineUtils.printUsageAndDie(parser, "An interactive shell for 
getting topic offsets.")
+if (args.length == 0)
+  CommandLineUtils.printUsageAndDie(p

[GitHub] [kafka] dajac commented on pull request #9430: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override

2021-01-27 Thread GitBox


dajac commented on pull request #9430:
URL: https://github.com/apache/kafka/pull/9430#issuecomment-768130144


   Could you rebase the PR as well?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12244) Deleting a topic within metadata.max.idle after last message floods log with warnings

2021-01-27 Thread Bart van Deenen (Jira)
Bart van Deenen created KAFKA-12244:
---

 Summary: Deleting a topic within metadata.max.idle after last 
message floods log with warnings
 Key: KAFKA-12244
 URL: https://issues.apache.org/jira/browse/KAFKA-12244
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 2.5.0
 Environment: Linux, Confluent 5.5.0
Reporter: Bart van Deenen


In a test we produce to a topic, then stop producing and delete the topic ca. 
30 seconds later.

This leads to a flood of WARN messages (ca. 10 per second) after several 
minutes that lasts for several minutes.

WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-2] 
Error while fetching metadata with correlation id 141 : 
\{test-topic=UNKNOWN_TOPIC_OR_PARTITION}

Investigation has shown that the issue can be solved by setting the 
metadata.max.idle.ms property to shorter than the interval between stopping 
messages and deleting the topic.

The issue itself is not critical, but can lead to a large pollution of your 
log, and thereby obscuring (or possibly losing) important messages.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tang7526 commented on pull request #9939: MINOR: fix @link tag in javadoc

2021-01-27 Thread GitBox


tang7526 commented on pull request #9939:
URL: https://github.com/apache/kafka/pull/9939#issuecomment-768169946


   > @tang7526 Could you revert the scald docs changes? The changes of Javadocs 
LGTM
   
   Done



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


rajinisivaram commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565190577



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
##
@@ -68,6 +69,19 @@ protected CreateTopicsResult(Map> fu
 return futures.get(topic).thenApply(TopicMetadataAndConfig::config);
 }
 
+/**
+ * Returns a future that provides topic ID for the topic when the request 
completes.
+ * 
+ * If broker version doesn't support replication factor in the response, 
throw
+ * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ * If broker returned an error for topic configs, throw appropriate 
exception. For example,
+ * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is 
thrown if user does not
+ * have permission to describe topic configs.
+ */
+public KafkaFuture topicId(String topic) {
+return futures.get(topic).thenApply(TopicMetadataAndConfig::topicId);
+}
+
 /**
  * Returns a future that provides number of partitions in the topic when 
the request completes.

Review comment:
   Returning `Uuid.ZERO_UUID` should be fine. The comment just seems odd, I 
am not sure why we have it that way in the other methods.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


rajinisivaram commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565192884



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
 val toDelete = mutable.Set[String]()
 if (!controller.isActive) {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(Errors.NOT_CONTROLLER.code))
   }
   sendResponseCallback(results)
 } else if (!config.deleteTopicEnable) {
   val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(error.code))
   }
   sendResponseCallback(results)
 } else {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
+val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+  else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)

Review comment:
   Java clients won't specify both, but the protocol allos it right? 
Perhaps we could throw InvalidRequestException?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
 val toDelete = mutable.Set[String]()
 if (!controller.isActive) {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(Errors.NOT_CONTROLLER.code))
   }
   sendResponseCallback(results)
 } else if (!config.deleteTopicEnable) {
   val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(error.code))
   }
   sendResponseCallback(results)
 } else {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
+val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+  else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)

Review comment:
   Java clients won't specify both, but the protocol allows it right? 
Perhaps we could throw InvalidRequestException?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9964: MINOR: remove duplicate code of serializing auto-generated data

2021-01-27 Thread GitBox


dajac commented on a change in pull request #9964:
URL: https://github.com/apache/kafka/pull/9964#discussion_r565222866



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##
@@ -265,14 +265,7 @@ public ByteBuffer encode() {
 "Should never try to encode a SubscriptionInfo with version [" 
+
 data.version() + "] > LATEST_SUPPORTED_VERSION [" + 
LATEST_SUPPORTED_VERSION + "]"
 );
-} else {
-final ObjectSerializationCache cache = new 
ObjectSerializationCache();
-final ByteBuffer buffer = ByteBuffer.allocate(data.size(cache, 
(short) data.version()));
-final ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
-data.write(accessor, cache, (short) data.version());
-buffer.rewind();

Review comment:
   `MessageUtil.toByteBuffer` uses `flip` whereas `rewind` is used here. 
Does it make any difference in this particular case?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9964: MINOR: remove duplicate code of serializing auto-generated data

2021-01-27 Thread GitBox


chia7712 commented on a change in pull request #9964:
URL: https://github.com/apache/kafka/pull/9964#discussion_r565287398



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##
@@ -265,14 +265,7 @@ public ByteBuffer encode() {
 "Should never try to encode a SubscriptionInfo with version [" 
+
 data.version() + "] > LATEST_SUPPORTED_VERSION [" + 
LATEST_SUPPORTED_VERSION + "]"
 );
-} else {
-final ObjectSerializationCache cache = new 
ObjectSerializationCache();
-final ByteBuffer buffer = ByteBuffer.allocate(data.size(cache, 
(short) data.version()));
-final ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
-data.write(accessor, cache, (short) data.version());
-buffer.rewind();

Review comment:
   In this case, the state of ```flip``` is coincide with ```rewind```. 
This buffer is filled so the ```limit``` is always equal to ```capability``` 
even though ```rewind``` does not reset ```limit```.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9939: MINOR: fix @link tag in javadoc

2021-01-27 Thread GitBox


chia7712 commented on pull request #9939:
URL: https://github.com/apache/kafka/pull/9939#issuecomment-768277706


   @tang7526 How about using import for this issue? Other references in 
Javadocs use import also.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mdespriee commented on pull request #9738: KAFKA-8744: Update Scala API to give names to processors

2021-01-27 Thread GitBox


mdespriee commented on pull request #9738:
URL: https://github.com/apache/kafka/pull/9738#issuecomment-768282420


   @bbejeck of course. Just rebased on trunk and added a couple of tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9847: KAFKA-10703: Better handling and doc for config defaults of topics

2021-01-27 Thread GitBox


tombentley commented on pull request #9847:
URL: https://github.com/apache/kafka/pull/9847#issuecomment-768284951


   @chia7712 any chance of a 2nd review here?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9977: MINOR: Update zookeeper to 3.5.9

2021-01-27 Thread GitBox


ijuma commented on pull request #9977:
URL: https://github.com/apache/kafka/pull/9977#issuecomment-768290763


   All JUnit tests passed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9977: MINOR: Update zookeeper to 3.5.9

2021-01-27 Thread GitBox


ijuma merged pull request #9977:
URL: https://github.com/apache/kafka/pull/9977


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #9982: MINOR: remove some explicit type argument in generator

2021-01-27 Thread GitBox


dengziming opened a new pull request #9982:
URL: https://github.com/apache/kafka/pull/9982


   *More detailed description of your change*
   From `ArrayList newCollection = new ArrayList< Integer 
>(arrayLength)` to `ArrayList newCollection = new ArrayList< Integer 
>(arrayLength)`
   
   *Summary of testing strategy (including rationale)*
   QA
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9981: MINOR: Upgrade to Scala 2.12.13

2021-01-27 Thread GitBox


chia7712 commented on pull request #9981:
URL: https://github.com/apache/kafka/pull/9981#issuecomment-768294345


   failed tests pass on my local. will trigger QA again



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 commented on pull request #9939: MINOR: fix @link tag in javadoc

2021-01-27 Thread GitBox


tang7526 commented on pull request #9939:
URL: https://github.com/apache/kafka/pull/9939#issuecomment-768321328


   > @tang7526 How about using import for this issue? Other references in 
Javadocs use import also.
   I tried that before and it didn't work.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9689) Automatic broker version detection to initialize stream client

2021-01-27 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272901#comment-17272901
 ] 

feyman commented on KAFKA-9689:
---

The version detection flow leveraging the versioning system is as described in 
the section: Use case: {{group_coordinator}} feature flag in KIP-584.

The code change mainly contains 3 parts:

1) StreamThread should know if itself is leader in the consumer group, if yes, 
it should periodically query the describeFeatures api to see if there are 
feature metadata updates

2) There should be some place to put the feature metadata in the 
MemberMetadata, either in the assignment(userData) or add a new field in the 
MemberMetadata(which involves public interface change). 

3) the StreamThread should dynamically switch to the new thread producer 
without affecting the existing tasks that

> Automatic broker version detection to initialize stream client
> --
>
> Key: KAFKA-9689
> URL: https://issues.apache.org/jira/browse/KAFKA-9689
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> Eventually we shall deprecate the flag to suppress EOS thread producer 
> feature, instead we take version detection approach on broker to decide which 
> semantic to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on a change in pull request #9738: KAFKA-8744: Update Scala API to give names to processors

2021-01-27 Thread GitBox


bbejeck commented on a change in pull request #9738:
URL: https://github.com/apache/kafka/pull/9738#discussion_r565371239



##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
##
@@ -391,4 +390,57 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
 
 testDriver.close()
   }
+
+  "setting a name on a filter processor" should "pass the name to the 
topology" in {
+val builder = new StreamsBuilder()
+val sourceTopic = "source"
+val sinkTopic = "sink"
+
+val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
+table
+  .filter((key, value) => key.equals("a") && value == 1, 
Named.as("my-name"))
+  .toStream
+  .to(sinkTopic)
+
+import scala.jdk.CollectionConverters._
+
+val filterNode = 
builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(3)
+filterNode.name() shouldBe "my-name"
+  }
+
+  "setting a name on a count processor" should "pass the name to the topology" 
in {
+val builder = new StreamsBuilder()
+val sourceTopic = "source"
+val sinkTopic = "sink"
+
+val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count(Named.as("my-name"))
+table.toStream.to(sinkTopic)
+
+import scala.jdk.CollectionConverters._
+
+val filterNode = 
builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(1)
+filterNode.name() shouldBe "my-name"

Review comment:
   shouldn't this line verify the `countNode` name?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9689) Automatic broker version detection to initialize stream client

2021-01-27 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272901#comment-17272901
 ] 

feyman edited comment on KAFKA-9689 at 1/27/21, 2:57 PM:
-

The version detection flow leveraging the versioning system is as described in 
the section: Use case: {{group_coordinator}} feature flag in KIP-584.

The code change mainly contains 3 parts:

1) StreamThread should know if itself is leader in the consumer group, if yes, 
it should periodically query the describeFeatures api to see if there are 
feature metadata updates

2) There should be some place to put the feature metadata in the 
MemberMetadata, either in the assignment(userData) or add a new field in the 
MemberMetadata(which involves public interface change). Current implementation 
levrages the assignment.

3) the StreamThread should dynamically switch to the new thread producer 
without affecting the existing tasks that


was (Author: feyman):
The version detection flow leveraging the versioning system is as described in 
the section: Use case: {{group_coordinator}} feature flag in KIP-584.

The code change mainly contains 3 parts:

1) StreamThread should know if itself is leader in the consumer group, if yes, 
it should periodically query the describeFeatures api to see if there are 
feature metadata updates

2) There should be some place to put the feature metadata in the 
MemberMetadata, either in the assignment(userData) or add a new field in the 
MemberMetadata(which involves public interface change). 

3) the StreamThread should dynamically switch to the new thread producer 
without affecting the existing tasks that

> Automatic broker version detection to initialize stream client
> --
>
> Key: KAFKA-9689
> URL: https://issues.apache.org/jira/browse/KAFKA-9689
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> Eventually we shall deprecate the flag to suppress EOS thread producer 
> feature, instead we take version detection approach on broker to decide which 
> semantic to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272938#comment-17272938
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck merged pull request #324:
URL: https://github.com/apache/kafka-site/pull/324


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272942#comment-17272942
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

omkreddy commented on pull request #324:
URL: https://github.com/apache/kafka-site/pull/324#issuecomment-768372236


   We should also add these docs to `kafka/docs` repo



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tang7526 edited a comment on pull request #9939: MINOR: fix @link tag in javadoc

2021-01-27 Thread GitBox


tang7526 edited a comment on pull request #9939:
URL: https://github.com/apache/kafka/pull/9939#issuecomment-768321328


   > @tang7526 How about using import for this issue? Other references in 
Javadocs use import also.
   Done



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565448192



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
 val toDelete = mutable.Set[String]()
 if (!controller.isActive) {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(Errors.NOT_CONTROLLER.code))
   }
   sendResponseCallback(results)
 } else if (!config.deleteTopicEnable) {
   val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(error.code))
   }
   sendResponseCallback(results)
 } else {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
+val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+  else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)

Review comment:
   Good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565448514



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
##
@@ -68,6 +69,19 @@ protected CreateTopicsResult(Map> fu
 return futures.get(topic).thenApply(TopicMetadataAndConfig::config);
 }
 
+/**
+ * Returns a future that provides topic ID for the topic when the request 
completes.
+ * 
+ * If broker version doesn't support replication factor in the response, 
throw
+ * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ * If broker returned an error for topic configs, throw appropriate 
exception. For example,
+ * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is 
thrown if user does not
+ * have permission to describe topic configs.
+ */
+public KafkaFuture topicId(String topic) {
+return futures.get(topic).thenApply(TopicMetadataAndConfig::topicId);
+}
+
 /**
  * Returns a future that provides number of partitions in the topic when 
the request completes.

Review comment:
   I thought it was odd too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272977#comment-17272977
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck commented on pull request #324:
URL: https://github.com/apache/kafka-site/pull/324#issuecomment-768404929


   > We should also add these docs to kafka/docs repo
   
   @omkreddy, yes a PR for kafka/docs is coming soon



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] miguno opened a new pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)

2021-01-27 Thread GitBox


miguno opened a new pull request #9983:
URL: https://github.com/apache/kafka/pull/9983


   This adds a new user-facing documentation "Geo-replication (Cross-Cluster 
Data Mirroring)" section to the Kafka Operations documentation that covers 
MirrorMaker v2.
   
   Was already merged to `kafka-site` via 
https://github.com/apache/kafka-site/pull/324.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272987#comment-17272987
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

miguno commented on pull request #324:
URL: https://github.com/apache/kafka-site/pull/324#issuecomment-768416513


   kafka/docs PR is up at https://github.com/apache/kafka/pull/9983



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8930) MM2 documentation

2021-01-27 Thread Michael G. Noll (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael G. Noll reassigned KAFKA-8930:
--

Assignee: Michael G. Noll  (was: Ryanne Dolan)

> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck merged pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)

2021-01-27 Thread GitBox


bbejeck merged pull request #9983:
URL: https://github.com/apache/kafka/pull/9983


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)

2021-01-27 Thread GitBox


bbejeck commented on pull request #9983:
URL: https://github.com/apache/kafka/pull/9983#issuecomment-768420998


   merged #9983 into trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9872: KAFKA-10759: ARM support for Kafka

2021-01-27 Thread GitBox


junrao commented on a change in pull request #9872:
URL: https://github.com/apache/kafka/pull/9872#discussion_r565473406



##
File path: Jenkinsfile
##
@@ -160,5 +160,23 @@ pipeline {
 }
   }
 }
+stage("Arm Build") {
+  agent { label 'arm4' }
+  options {
+timeout(time: 8, unit: 'HOURS')
+timestamps()
+  }
+  environment {

Review comment:
   Do we need to specify the jdk version since scala 2.12 only works on jdk 
8?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)

2021-01-27 Thread GitBox


bbejeck commented on pull request #9983:
URL: https://github.com/apache/kafka/pull/9983#issuecomment-768429896


   cherry-picked to 2.7 and 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mdespriee commented on a change in pull request #9738: KAFKA-8744: Update Scala API to give names to processors

2021-01-27 Thread GitBox


mdespriee commented on a change in pull request #9738:
URL: https://github.com/apache/kafka/pull/9738#discussion_r565497044



##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
##
@@ -391,4 +390,57 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
 
 testDriver.close()
   }
+
+  "setting a name on a filter processor" should "pass the name to the 
topology" in {
+val builder = new StreamsBuilder()
+val sourceTopic = "source"
+val sinkTopic = "sink"
+
+val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
+table
+  .filter((key, value) => key.equals("a") && value == 1, 
Named.as("my-name"))
+  .toStream
+  .to(sinkTopic)
+
+import scala.jdk.CollectionConverters._
+
+val filterNode = 
builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(3)
+filterNode.name() shouldBe "my-name"
+  }
+
+  "setting a name on a count processor" should "pass the name to the topology" 
in {
+val builder = new StreamsBuilder()
+val sourceTopic = "source"
+val sinkTopic = "sink"
+
+val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count(Named.as("my-name"))
+table.toStream.to(sinkTopic)
+
+import scala.jdk.CollectionConverters._
+
+val filterNode = 
builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(1)
+filterNode.name() shouldBe "my-name"

Review comment:
   bad copy-paste of variable name.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273030#comment-17273030
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck opened a new pull request #326:
URL: https://github.com/apache/kafka-site/pull/326


   The MM2 docs are already in for 2.7 via 
https://github.com/apache/kafka-site/pull/324, this PR adds them to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273055#comment-17273055
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck commented on pull request #326:
URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768461055


   ping @miguno for a +1



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9969: MINOR: updated upgrade and architecture for KIP-663, KIP-696, and KIP-671

2021-01-27 Thread GitBox


mjsax commented on a change in pull request #9969:
URL: https://github.com/apache/kafka/pull/9969#discussion_r565515280



##
File path: docs/streams/upgrade-guide.html
##
@@ -91,6 +91,29 @@ Streams API
 We extended StreamJoined to include the options 
withLoggingEnabled() and withLoggingDisabled() in
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs";>KIP-689.
 
+
+We added two new methods to Kafka Streams, namely 
addThread() and removeThread() in
+https://cwiki.apache.org/confluence/x/FDd4CQ";>KIP-663.
+These enabled adding a removing StreamThreads to running KafkaStreams 
client.
+   
+
+We deprecated setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
+in favor of setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
+in https://cwiki.apache.org/confluence/x/lkN4CQ";>KIP-671.
+The default handler will close the client and the client will transit 
to state ERROR.
+If you implement a custom handler, the new interface allows you to 
return a StreamThreadExceptionResponse,
+which will determine how the application will respond to a thread 
failure.
+
+
+Changes in https://cwiki.apache.org/confluence/x/FDd4CQ";>KIP-663 necessitated 
the KafkaStreams client
+state machine to update, which was done in https://cwiki.apache.org/confluence/x/lCvZCQ";>KIP-696..
+The ERROR state is now terminal with PENDING_ERROR being a 
transitional state where the resources are closing.
+The ERROR state indicates that there something wrong and should not be 
blindly restarted without classifying

Review comment:
   `that there something` -> `that there [is] something` ?
   
   `and should not be blindly restarted` -> `and you should not restarted 
KafkaStreams blindly` ? (or `and [KafkaStreams] should not be blindly 
restarted`) ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9979: KAFKA-12238; Implement `DescribeProducers` API from KIP-664

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9979:
URL: https://github.com/apache/kafka/pull/9979#discussion_r565517686



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -511,6 +513,9 @@ public void testSerialization() throws Exception {
 checkRequest(createAlterClientQuotasRequest(), true);
 checkErrorResponse(createAlterClientQuotasRequest(), 
unknownServerException, true);
 checkResponse(createAlterClientQuotasResponse(), 0, true);
+checkRequest(createDescribeProducersRequest(), true);
+checkErrorResponse(createDescribeProducersRequest(), 
unknownServerException, true);
+checkResponse(createDescribeProducersResponse(), 0, true);

Review comment:
   Sounds reasonable. I'm also not super fond of this test pattern.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9969: MINOR: updated upgrade and architecture for KIP-663, KIP-696, and KIP-671

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9969:
URL: https://github.com/apache/kafka/pull/9969#discussion_r565518232



##
File path: docs/streams/upgrade-guide.html
##
@@ -91,6 +91,29 @@ Streams API
 We extended StreamJoined to include the options 
withLoggingEnabled() and withLoggingDisabled() in
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs";>KIP-689.
 
+
+We added two new methods to Kafka Streams, namely 
addThread() and removeThread() in
+https://cwiki.apache.org/confluence/x/FDd4CQ";>KIP-663.
+These enabled adding a removing StreamThreads to running KafkaStreams 
client.
+   
+
+We deprecated setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
+in favor of setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
+in https://cwiki.apache.org/confluence/x/lkN4CQ";>KIP-671.
+The default handler will close the client and the client will transit 
to state ERROR.
+If you implement a custom handler, the new interface allows you to 
return a StreamThreadExceptionResponse,
+which will determine how the application will respond to a thread 
failure.
+
+
+Changes in https://cwiki.apache.org/confluence/x/FDd4CQ";>KIP-663 necessitated 
the KafkaStreams client
+state machine to update, which was done in https://cwiki.apache.org/confluence/x/lCvZCQ";>KIP-696..
+The ERROR state is now terminal with PENDING_ERROR being a 
transitional state where the resources are closing.
+The ERROR state indicates that there something wrong and should not be 
blindly restarted without classifying

Review comment:
   I went with `is something wrong and the KafkaStreams clinet should not 
be blindly restarted`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273059#comment-17273059
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

miguno commented on pull request #326:
URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768466468


   This LGTM, though (1) there were some minor HTML changes not directly 
related to the original PR and (2) I didn't test this PR locally myself to 
ensure proper HTML rendering etc.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-01-27 Thread GitBox


vvcephei commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r565519066



##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * The {@code Branched} class is used to define the optional parameters when 
building branches with
+ * {@link BranchedKStream}.
+ *
+ * @param  type of record key
+ * @param  type of record value
+ */
+public class Branched implements NamedOperation> {
+
+protected final String name;
+protected final Function, ? extends KStream> 
chainFunction;
+protected final Consumer> chainConsumer;
+
+protected Branched(final String name,
+   final Function, ? extends 
KStream> chainFunction,
+   final Consumer> chainConsumer) {
+this.name = name;
+this.chainFunction = chainFunction;
+this.chainConsumer = chainConsumer;
+}
+
+/**
+ * Create an instance of {@code Branched} with provided branch name suffix.
+ *
+ * @param name the branch name suffix to be used. If {@code null}, a 
default branch name suffix will be generated
+ * (see {@link BranchedKStream} description for details)
+ * @param   key type
+ * @param   value type
+ * @return a new instance of {@code Branched}
+ */
+public static  Branched as(final String name) {
+return new Branched<>(name, null, null);

Review comment:
   I agree, it seems like a good idea to check for `null` here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9979: KAFKA-12238; Implement `DescribeProducers` API from KIP-664

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9979:
URL: https://github.com/apache/kafka/pull/9979#discussion_r565519812



##
File path: 
clients/src/main/resources/common/message/DescribeProducersResponse.json
##
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 61,
+  "type": "response",
+  "name": "DescribeProducersResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "Topics", "type": "[]TopicResponse", "versions": "0+",
+  "about": "Each topic in the response.", "fields": [
+  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+"about": "The topic name" },
+  { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+",
+"about": "Each partition in the response.", "fields": [
+{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+  "about": "The partition index." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The partition error code, or 0 if there was no error." },

Review comment:
   I don't feel strongly about it. Perhaps it's better to have it than not. 
I will add it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-01-27 Thread GitBox


vvcephei commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r565519912



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.util.Map;
+
+/**
+ * Branches the records in the original stream based on the predicates 
supplied for the branch definitions.
+ * 
+ * Branches are defined with {@link BranchedKStream#branch(Predicate, 
Branched)} or
+ * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is 
evaluated against the predicates
+ * supplied via {@link Branched} parameters, and is routed to the first branch 
for which its respective predicate
+ * evaluates to {@code true}. If a record does not match any predicates, it 
will be routed to the default branch,
+ * or dropped if no default branch is created.
+ * 
+ * Each branch (which is a {@link KStream} instance) then can be processed 
either by
+ * a {@link java.util.function.Function} or a {@link 
java.util.function.Consumer} provided via a {@link Branched}
+ * parameter. If certain conditions are met, it also can be accessed from the 
{@link Map} returned by
+ * {@link BranchedKStream#defaultBranch(Branched)} or {@link 
BranchedKStream#noDefaultBranch()}
+ * (see usage examples).
+ * 
+ * The branching happens on first-match: A record in the original stream is 
assigned to the corresponding result

Review comment:
   Yes, I agree, unless you want to add a noun:
   
   ```suggestion
* The branching happens on a first-match basis: A record in the original 
stream is assigned to the corresponding result
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9979: KAFKA-12238; Implement `DescribeProducers` API from KIP-664

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9979:
URL: https://github.com/apache/kafka/pull/9979#discussion_r565525057



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3365,6 +3366,44 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
+val describeProducersRequest = request.body[DescribeProducersRequest]
+
+def partitionError(topicPartition: TopicPartition, error: Errors): 
DescribeProducersResponseData.PartitionResponse = {
+  new DescribeProducersResponseData.PartitionResponse()
+.setPartitionIndex(topicPartition.partition)
+.setErrorCode(error.code)
+}
+
+val response = new DescribeProducersResponseData()
+describeProducersRequest.data.topics.forEach { topicRequest =>
+  val topicResponse = new DescribeProducersResponseData.TopicResponse()
+.setName(topicRequest.name)
+  val topicError = if (!authHelper.authorize(request.context, READ, TOPIC, 
topicRequest.name))
+Some(Errors.TOPIC_AUTHORIZATION_FAILED)
+  else if (!metadataCache.contains(topicRequest.name))
+Some(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+  else
+None
+
+  topicRequest.partitionIndexes.forEach { partitionId =>
+val topicPartition = new TopicPartition(topicRequest.name, partitionId)
+val partitionResponse = topicError match {
+  case Some(error) => partitionError(topicPartition, error)
+  case None => replicaManager.activeProducerState(topicPartition)
+}
+topicResponse.partitions.add(partitionResponse)
+  }
+
+  if (!topicResponse.partitions.isEmpty) {
+response.topics.add(topicResponse)
+  }

Review comment:
   Hmm... It's been a while since I wrote this. I agree it looks a little 
strange. I guess there's probably no harm echoing back the same structure that 
was sent.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6223) Please delete old releases from mirroring system

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273066#comment-17273066
 ] 

ASF GitHub Bot commented on KAFKA-6223:
---

mimaison merged pull request #322:
URL: https://github.com/apache/kafka-site/pull/322


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Please delete old releases from mirroring system
> 
>
> Key: KAFKA-6223
> URL: https://issues.apache.org/jira/browse/KAFKA-6223
> Project: Kafka
>  Issue Type: Bug
> Environment: https://dist.apache.org/repos/dist/release/kafka/
>Reporter: Sebb
>Assignee: Rajini Sivaram
>Priority: Major
>
> To reduce the load on the ASF mirrors, projects are required to delete old 
> releases [1]
> Please can you remove all non-current releases?
> It's unfair to expect the 3rd party mirrors to carry old releases.
> Note that older releases can still be linked from the download page, but such 
> links should use the archive server at:
> https://archive.apache.org/dist/kafka/
> A suggested process is:
> + Change the download page to use archive.a.o for old releases
> + Delete the corresponding directories from 
> {{https://dist.apache.org/repos/dist/release/kafka/}}
> e.g. {{svn delete https://dist.apache.org/repos/dist/release/kafka/0.8.0}}
> Thanks!
> [1] http://www.apache.org/dev/release.html#when-to-archive



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-01-27 Thread GitBox


mjsax commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-768478775


   @inponomarev the failing tests seems to be due to a known issue that was 
fixed via https://github.com/apache/kafka/pull/9768
   
   Can you rebase your PR to pickup the fix so we can get a green build?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273073#comment-17273073
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck commented on pull request #326:
URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768481852


   > I didn't test this PR locally myself to ensure proper HTML rendering etc.
   FWIW I rendered it locally and it seemed fine



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273075#comment-17273075
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck merged pull request #326:
URL: https://github.com/apache/kafka-site/pull/326


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273074#comment-17273074
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck edited a comment on pull request #326:
URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768481852


   > I didn't test this PR locally myself to ensure proper HTML rendering etc.
   
   FWIW I rendered it locally and it seemed fine



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r565575532



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1223,7 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 Set.empty[MetadataResponseTopic]
   else
 unauthorizedForDescribeTopics.map(topic =>
-  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
false, util.Collections.emptyList()))
+  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
Uuid.ZERO_UUID, false, util.Collections.emptyList()))

Review comment:
   So will we never reach this code path when using topic IDs? I think we 
are using topics to decide authorization. So in the case where we use ids and 
the name exists, then we will expose the name and return a zero ID?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r565575532



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1223,7 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 Set.empty[MetadataResponseTopic]
   else
 unauthorizedForDescribeTopics.map(topic =>
-  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
false, util.Collections.emptyList()))
+  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
Uuid.ZERO_UUID, false, util.Collections.emptyList()))

Review comment:
   So will we never reach this code path when using topic IDs? I think we 
are using topics to decide authorization. So in the case where we use ids and 
the name exists, then we will expose the name and return a zero ID? Might be 
useful to create an authorizer integration test with topic IDs to ensure 
correctness.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565591713



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
 val toDelete = mutable.Set[String]()
 if (!controller.isActive) {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(Errors.NOT_CONTROLLER.code))
   }
   sendResponseCallback(results)
 } else if (!config.deleteTopicEnable) {
   val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(error.code))
   }
   sendResponseCallback(results)
 } else {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
+val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+  else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
 results.add(new DeletableTopicResult()
-  .setName(topic))
+  .setName(name)
+  .setTopicId(topic.topicId()))
   }
   val authorizedTopics = authHelper.filterByAuthorized(request.context, 
DELETE, TOPIC,
 results.asScala)(_.name)
   results.forEach { topic =>
- if (!authorizedTopics.contains(topic.name))
+ val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && 
topic.name() != null
+ val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID)

Review comment:
   Thinking on this more, I can simplify the line. If topic name is null, 
then we didn't have a valid topic ID.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565591713



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
 val toDelete = mutable.Set[String]()
 if (!controller.isActive) {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(Errors.NOT_CONTROLLER.code))
   }
   sendResponseCallback(results)
 } else if (!config.deleteTopicEnable) {
   val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(error.code))
   }
   sendResponseCallback(results)
 } else {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
+val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+  else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
 results.add(new DeletableTopicResult()
-  .setName(topic))
+  .setName(name)
+  .setTopicId(topic.topicId()))
   }
   val authorizedTopics = authHelper.filterByAuthorized(request.context, 
DELETE, TOPIC,
 results.asScala)(_.name)
   results.forEach { topic =>
- if (!authorizedTopics.contains(topic.name))
+ val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && 
topic.name() != null
+ val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID)

Review comment:
   Thinking on this more, I can simplify the line. Especially if I make 
changes with the code above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-27 Thread GitBox


vvcephei commented on pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#issuecomment-768546193


   Most of those failures were known flaky tests, but one was an EasyMock 
error. I'm not able to repro it locally after a rebase, though. Rebased, 
pushed, and trying one more time to get a clean build.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 opened a new pull request #9984:
URL: https://github.com/apache/kafka/pull/9984


   add timeout and static group rebalance to remove thread
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565607533



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -610,17 +610,32 @@ public void setStreamsUncaughtExceptionHandler(final 
java.util.function.Consumer
 this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
 }
 
-public void waitOnThreadState(final StreamThread.State targetState) {
+public boolean waitOnThreadState(final StreamThread.State targetState, 
long timeoutMs) {
+if (timeoutMs < 0) {

Review comment:
   for the non timeout uses

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID(){
+return mainConsumer.groupMetadata().groupInstanceId().orElse("");

Review comment:
   It seems easier to get it form here than the config. It looked like I 
might have how to manipulate strings in that case 

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1007,56 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));

Review comment:
   I ended up getting the `group.instance.id` from the streamThread





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r565612146



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -186,23 +241,37 @@ public String toString() {
  * incremental fetch requests (see below).
  */
 private LinkedHashMap next;
+private Map topicIds;
+private Map topicNames;
+private Map partitionsPerTopic;
 private final boolean copySessionPartitions;
 
 Builder() {
 this.next = new LinkedHashMap<>();
+this.topicIds = new HashMap<>();
+this.topicNames = new HashMap<>();
+this.partitionsPerTopic = new HashMap<>();
 this.copySessionPartitions = true;
 }
 
 Builder(int initialSize, boolean copySessionPartitions) {
 this.next = new LinkedHashMap<>(initialSize);
+this.topicIds = new HashMap<>(initialSize);
+this.topicNames = new HashMap<>(initialSize);
+this.partitionsPerTopic = new HashMap<>(initialSize);
 this.copySessionPartitions = copySessionPartitions;
 }
 
 /**
  * Mark that we want data from this partition in the upcoming fetch.
  */
-public void add(TopicPartition topicPartition, PartitionData data) {
-next.put(topicPartition, data);
+public void add(TopicPartition topicPartition, Uuid id, PartitionData 
data) {
+if (next.put(topicPartition, data) == null)
+partitionsPerTopic.merge(topicPartition.topic(), 1, (prev, 
next) -> prev + next);

Review comment:
   I think I may want to do this in a simpler way. I want to keep track if 
we have IDs for all the topics and I'm not sure if there is a better way to 
figure out when a topic is no longer in a session besides checking all the 
topic partitions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-27 Thread GitBox


rajinisivaram commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r565615638



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1223,7 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 Set.empty[MetadataResponseTopic]
   else
 unauthorizedForDescribeTopics.map(topic =>
-  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
false, util.Collections.emptyList()))
+  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
Uuid.ZERO_UUID, false, util.Collections.emptyList()))

Review comment:
   Good point. If not authorized for describe when using topic ids, we need 
to make sure we don't return the topic or information about existence of a 
topic - i.e we can't return TOPIC_AUTHORIZATION_FAILED. Perhaps 
UNKNOWN_TOPIC_ID would be more suitable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


cadonna commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565615976



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID() {

Review comment:
   Why not an `Optional`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


cadonna commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565616503



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID(){
+return mainConsumer.groupMetadata().groupInstanceId().orElse("");

Review comment:
   I would do it the same way.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565611900



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -91,6 +93,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;

Review comment:
   There's actually a kafka-specific version of `TimeoutException` that you 
should use to keep in line with other kafka APIs. It's 
`org.apache.kafka.common.errors.TimeoutException`

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+final Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+return Optional.of(streamThread.getName());
+}
+}
+}
+log.warn("There are no threads eligible for removal");
+} else {
+log.warn("Cannot remove a stream thread when Kafka Streams client 
is in state  " + state());
+}
+return Optional.empty();
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+if (isRunningOrRebalancing()) {
+synchronized (changeThreadCount) {
+// make a copy of threads to avoid holding lock
+for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
+if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
+|| threads.size() == 1)) {
+streamThread.shutdown();
+if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
+}
+threads.remove(streamThread);
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
+resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));

Review comment:
   I'm not sure how `removeMembersFromConsumerGroup` would behave if you 
passed in `""` as the `group.instance.id`, do you know? If not then let's just 
be safe and check what `streamThread.getGroupInstanceID()` returns, and skip 
this call if there is no group.instance.id (ie if not static)

##
File path: streams/src/

[GitHub] [kafka] bbejeck commented on pull request #9738: KAFKA-8744: Update Scala API to give names to processors

2021-01-27 Thread GitBox


bbejeck commented on pull request #9738:
URL: https://github.com/apache/kafka/pull/9738#issuecomment-768565772


   Java 11 and 15 passed
   
   Java 8 failures unrelated
   ```
   org.apache.kafka.clients.consumer.KafkaConsumerTest.testCloseWithTimeUnit()
   
org.apache.kafka.clients.consumer.internals.FetcherTest.testEarlierOffsetResetArrivesLate()
   kafka.api.TransactionsBounceTest.testWithGroupMetadata()
   ```
   Kicking off tests again to try and get a green build



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565621384



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID() {

Review comment:
   Either way we need to deal with it. I thought it would be easier to just 
do it once. But It probably better practice to handle it later. I will change 
it to Optional





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565622163



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -91,6 +93,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;

Review comment:
   I did not know that. good catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565622524



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -610,17 +610,32 @@ public void setStreamsUncaughtExceptionHandler(final 
java.util.function.Consumer
 this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
 }
 
-public void waitOnThreadState(final StreamThread.State targetState) {
+public boolean waitOnThreadState(final StreamThread.State targetState, 
long timeoutMs) {
+if (timeoutMs < 0) {
+timeoutMs = 0;
+} else if (timeoutMs == 0) {
+timeoutMs = Long.MAX_VALUE;

Review comment:
   ah yeah, I had to fix this when I was writing my test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565628241



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);

Review comment:
   good idea, I reworked that a bit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil opened a new pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil opened a new pull request #9985:
URL: https://github.com/apache/kafka/pull/9985


   With KIP-595, we expect the RaftConfig to specify the quorum voter endpoints 
upfront on startup. In the general case, this works fine. However, for testing 
we need a more lazy approach that discovers the other voters in the quorum 
after startup (i.e. controller port bind). This approach also lends itself well 
to cases where we might have an observer that discovers the voter endpoints 
from, say a `DescribeQuorum` event.
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565628682



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+final Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+return Optional.of(streamThread.getName());
+}
+}
+}
+log.warn("There are no threads eligible for removal");
+} else {
+log.warn("Cannot remove a stream thread when Kafka Streams client 
is in state  " + state());
+}
+return Optional.empty();
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+if (isRunningOrRebalancing()) {
+synchronized (changeThreadCount) {
+// make a copy of threads to avoid holding lock
+for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
+if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
+|| threads.size() == 1)) {
+streamThread.shutdown();
+if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
+}
+threads.remove(streamThread);
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
+resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));

Review comment:
   that works





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10716) Streams processId is unstable across restarts resulting in task mass migration

2021-01-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10716:
---
Fix Version/s: 2.6.2

> Streams processId is unstable across restarts resulting in task mass migration
> --
>
> Key: KAFKA-10716
> URL: https://issues.apache.org/jira/browse/KAFKA-10716
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> The new high availability feature of KIP-441 relies on deterministic 
> assignment to produce an eventually-stable assignment. The 
> HighAvailabilityTaskAssignor assigns tasks based on the unique processId 
> assigned to each client, so if the same set of Kafka Streams applications 
> participate in a rebalance it should generate the same task assignment every 
> time.
> Unfortunately the processIds aren't stable across restarts. We generate a 
> random UUID in the KafkaStreams constructor, so each time the process starts 
> up it would be assigned a completely different processId. Unless this new 
> processId happens to be in exactly the same order as the previous one, a 
> single bounce or crash/restart can result in a large scale shuffling of tasks 
> based on a completely different eventual assignment.
> Ultimately we should fix this via KAFKA-10121, but that's a nontrivial 
> undertaking and this bug merits some immediate relief if we don't intend to 
> tackle the larger problem in the upcoming releases 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10716) Streams processId is unstable across restarts resulting in task mass migration

2021-01-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10716:
---
Priority: Critical  (was: Major)

> Streams processId is unstable across restarts resulting in task mass migration
> --
>
> Key: KAFKA-10716
> URL: https://issues.apache.org/jira/browse/KAFKA-10716
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Critical
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> The new high availability feature of KIP-441 relies on deterministic 
> assignment to produce an eventually-stable assignment. The 
> HighAvailabilityTaskAssignor assigns tasks based on the unique processId 
> assigned to each client, so if the same set of Kafka Streams applications 
> participate in a rebalance it should generate the same task assignment every 
> time.
> Unfortunately the processIds aren't stable across restarts. We generate a 
> random UUID in the KafkaStreams constructor, so each time the process starts 
> up it would be assigned a completely different processId. Unless this new 
> processId happens to be in exactly the same order as the previous one, a 
> single bounce or crash/restart can result in a large scale shuffling of tasks 
> based on a completely different eventual assignment.
> Ultimately we should fix this via KAFKA-10121, but that's a nontrivial 
> undertaking and this bug merits some immediate relief if we don't intend to 
> tackle the larger problem in the upcoming releases 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565635667



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+final Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+return Optional.of(streamThread.getName());
+}
+}
+}
+log.warn("There are no threads eligible for removal");
+} else {
+log.warn("Cannot remove a stream thread when Kafka Streams client 
is in state  " + state());
+}
+return Optional.empty();
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+if (isRunningOrRebalancing()) {
+synchronized (changeThreadCount) {
+// make a copy of threads to avoid holding lock
+for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
+if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
+|| threads.size() == 1)) {
+streamThread.shutdown();
+if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
+}
+threads.remove(streamThread);
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
+resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));

Review comment:
   Sounds good. How should you handle the `ExecutionException`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565640687



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (streamThread.getGroupInstanceID().isPresent()) {
+final MemberToRemove memberToRemove = new 
MemberToRemove(streamThread.getGroupInstanceID().get());
+final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
+final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = 
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+try {
+
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs 
- begin, TimeUnit.MILLISECONDS);
+} catch (final 
java.util.concurrent.TimeoutException e) {

Review comment:
   have to make this a kafkaTimeout





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565634630



##
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##
@@ -34,6 +34,7 @@ import scala.collection.mutable
 
 object KafkaNetworkChannel {
 
+  val nonRoutableAddress = new InetSocketAddress("0.0.0.0", 0)

Review comment:
   Let me suggest an alternative for the sake of argument. Currently, 
`RaftConfig.parseVoterConnections` return `Map`. 
This works for the case we're interested in, but there is a risk of our 
sentinel non-routable address leaking into unexpected cases (a common source of 
bugs in Kafkaland). Alternatively, what if we add something like this to 
`RaftConfig`:
   
   ```java
   public class RaftConfig {
 ...
   
 public Map quorumVoterConnections();
   
 public static interface AddressSpec {
 }
   
 public static class InetAddressSpec implements AddressSpec {
   final InetSocketAddress address;
 }
   
 public static class UnknownAddressSpec implements AddressSpec {
 }
   }
   ```
   
   The advantage is that this lets the type checker help us ensure that we are 
checking for a sentinel. 

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -208,8 +209,9 @@ public KafkaRaftClient(
 int fetchMaxWaitMs,
 OptionalInt nodeId,
 LogContext logContext,
-Random random
-) {
+Random random,
+RaftConfig raftConfig
+) throws IOException {

Review comment:
   Is anything in here throwing `IOException`?

##
File path: raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
##
@@ -25,20 +25,25 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class MockNetworkChannel implements NetworkChannel {
 private final AtomicInteger correlationIdCounter;
+private final Map addressCache;

Review comment:
   I don't think we are using the address here. Can we use `Set`? 
Potentially we could even get rid of this collection. It was more useful when 
the RaftClient itself was expected to discover the voter endpoints.

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -36,7 +36,9 @@
 public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters";
 public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint 
information for " +
 "the set of voters in a comma-separated list of `{id}@{host}:{port}` 
entries. " +
-"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`";
+"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094.`" +
+"If the voter endpoints are not known at startup, a non-routable 
address can be provided instead." +

Review comment:
   Perhaps we can keep this as an internal feature for now. It is not 
something that a user would be able to leverage. We can document it in the 
class javadoc perhaps. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565643848



##
File path: raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
##
@@ -25,20 +25,25 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class MockNetworkChannel implements NetworkChannel {
 private final AtomicInteger correlationIdCounter;
+private final Map addressCache;

Review comment:
   On second thought, it seems worth keeping this as a set. It helps us 
ensure that no requests are sent to non-voters. I would just change the field 
name to `Set voterIds`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12169) Consumer can not know paritions chage when client leader restart with static membership protocol

2021-01-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273158#comment-17273158
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12169:


hey [~boyang] any thoughts?

> Consumer can not know paritions chage when client leader restart with static 
> membership protocol
> 
>
> Key: KAFKA-12169
> URL: https://issues.apache.org/jira/browse/KAFKA-12169
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.1, 2.6.1
>Reporter: zou shengfu
>Priority: Major
>
> Background: 
>  Kafka consumer services run with static membership and cooperative rebalance 
> protocol on kubernetes, and services often restart because of operation. When 
> we added partitions from 1000 to 2000 for the topic, client leader restart 
> with unknown member id at the same time, we found  the consumers do not 
> tigger rebalance and still consume 1000 paritions
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] skaundinya15 commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


skaundinya15 commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565656131



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) {
 public List clusterPairs() {
 List pairs = new ArrayList<>();
 Set clusters = clusters();
+Map originalStrings = originalsStrings();
+boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
 for (String source : clusters) {
 for (String target : clusters) {
-SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
 if (!source.equals(target)) {
-pairs.add(sourceAndTarget);
+String clusterPairConfigPrefix = source + "->" + target + 
".";
+boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
+if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
   Thanks for the explanation @twobeeb, this makes sense. It would be good 
to add some comments explaining this in the code as this isn't immediately 
obvious. Other than that it looks good to me overall.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9840: KAFKA-10867: Improved task idling

2021-01-27 Thread GitBox


vvcephei commented on pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#issuecomment-768599882


   There was a merge conflict with trunk. Rebased and pushed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #9986: JUnit extensions for integration tests

2021-01-27 Thread GitBox


mumrah opened a new pull request #9986:
URL: https://github.com/apache/kafka/pull/9986


   TBD
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565672756



##
File path: raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
##
@@ -25,20 +25,25 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class MockNetworkChannel implements NetworkChannel {
 private final AtomicInteger correlationIdCounter;
+private final Map addressCache;

Review comment:
   Ack. I considered it. Figured it might be useful to have the endpoints 
for any future tests. Looks like the MockNetworkChannel doesn't test anything 
endpoint specific. Will remove





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673305



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -208,8 +209,9 @@ public KafkaRaftClient(
 int fetchMaxWaitMs,
 OptionalInt nodeId,
 LogContext logContext,
-Random random
-) {
+Random random,
+RaftConfig raftConfig
+) throws IOException {

Review comment:
   Yea. This is an artifact from the `quorumState.initialize` change. Since 
that's moved down to the `client.initialize`, we can remove it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673491



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -36,7 +36,9 @@
 public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters";
 public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint 
information for " +
 "the set of voters in a comma-separated list of `{id}@{host}:{port}` 
entries. " +
-"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`";
+"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094.`" +
+"If the voter endpoints are not known at startup, a non-routable 
address can be provided instead." +

Review comment:
   Fair enough. I can move it there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673883



##
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##
@@ -34,6 +34,7 @@ import scala.collection.mutable
 
 object KafkaNetworkChannel {
 
+  val nonRoutableAddress = new InetSocketAddress("0.0.0.0", 0)

Review comment:
   Yea, good catch. The AddressSpec makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #9987: KAFKA-10895: Gracefully handle invalid JAAS configs

2021-01-27 Thread GitBox


C0urante opened a new pull request #9987:
URL: https://github.com/apache/kafka/pull/9987


   Follow-up to https://github.com/apache/kafka/pull/9806
   
   If an invalid JAAS config is present on the worker, invoking 
`Configuration::getConfiguration` throws an exception. The changes from #9806 
cause that exception to be thrown during plugin scanning, which causes the 
worker to fail even if it is not configured to use the basic auth extension at 
all.
   
   This follow-up handles invalid JAAS configurations more gracefully, and only 
throws them if the worker is actually configured to use the basic auth 
extension, at the time that the extension is instantiated and configured.
   
   Two unit tests are added to test the green-path and red-path behavior of the 
extension when it encounters well-formed and ill-formed JAAS configurations, 
respectively.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-27 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r564872645



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte

Review comment:
   typo thte

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by ap

[GitHub] [kafka] gharris1727 commented on a change in pull request #9987: KAFKA-10895: Gracefully handle invalid JAAS configs

2021-01-27 Thread GitBox


gharris1727 commented on a change in pull request #9987:
URL: https://github.com/apache/kafka/pull/9987#discussion_r565694653



##
File path: 
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
##
@@ -80,7 +95,8 @@ public void close() throws IOException {
 
 @Override
 public void configure(Map configs) {
-
+// If we failed to retrieve a JAAS configuration during startup, throw 
that exception now
+CONFIGURATION.get();

Review comment:
   Could you add a test which confirms that we're propagating the exception 
here? At the moment, the test verifies that the wrapping method works, but 
doesn't verify that it's used by the rest extension during the loading phase.

##
File path: 
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java
##
@@ -63,4 +69,25 @@ public void testJaasConfigurationNotOverwritten() {
 assertNotEquals(overwrittenConfiguration, 
jaasFilter.getValue().configuration,
 "Overwritten JAAS configuration should not be used by basic auth 
REST extension");
 }
+
+@Test
+public void testBadJaasConfiguration() {
+SecurityException jaasConfigurationException = new 
SecurityException(new IOException("Bad JAAS config is bad"));
+Supplier configuration = 
BasicAuthSecurityRestExtension.initializeConfiguration(() -> {
+throw jaasConfigurationException;
+});
+
+ConnectException thrownException = 
assertThrows(ConnectException.class, configuration::get);
+assertEquals(jaasConfigurationException, thrownException.getCause());
+}
+
+@Test
+public void testGoodJaasConfiguration() {
+Configuration mockConfiguration = EasyMock.mock(Configuration.class);

Review comment:
   The identity function could pass this test, but wouldn't have the 
behavior we need in the BasicAuthSecurityRestExtension. I wonder if there's a 
way to confirm that the mockConfiguration has been evaluated prior to calling 
`get()` on the returned supplier.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] twobeeb commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


twobeeb commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565704218



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) {
 public List clusterPairs() {
 List pairs = new ArrayList<>();
 Set clusters = clusters();
+Map originalStrings = originalsStrings();
+boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
 for (String source : clusters) {
 for (String target : clusters) {
-SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
 if (!source.equals(target)) {
-pairs.add(sourceAndTarget);
+String clusterPairConfigPrefix = source + "->" + target + 
".";
+boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
+if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
   @skaundinya15 Thanks for your review.
   I'm having a hard time phrasing this properly, suggestions would be welcome.
   Is this comment proposition aligned with what you had in mind ?
   ```suggestion
   // By default, all source->target Herder combinations 
are created even if `x->y.enabled=false`
   // Unless `emit.heartbeats.enabled=false` or 
`x->y.emit.heartbeats.enabled=false`
   // Reason for this behavior: for a given replication 
flow A->B with heartbeats, 2 herders are required :
   // B->A for the MirrorHeartbeatConnector (emits 
heartbeats into A)
   // A->B for the MirrorSourceConnector (actual 
replication flow)
   if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] twobeeb commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


twobeeb commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565704218



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) {
 public List clusterPairs() {
 List pairs = new ArrayList<>();
 Set clusters = clusters();
+Map originalStrings = originalsStrings();
+boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
 for (String source : clusters) {
 for (String target : clusters) {
-SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
 if (!source.equals(target)) {
-pairs.add(sourceAndTarget);
+String clusterPairConfigPrefix = source + "->" + target + 
".";
+boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
+if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
   Thanks for your review @skaundinya15.
   I'm having a hard time phrasing this properly, suggestions would be welcome.
   Is this comment proposition aligned with what you had in mind ?
   ```suggestion
   // By default, all source->target Herder combinations 
are created even if `x->y.enabled=false`
   // Unless `emit.heartbeats.enabled=false` or 
`x->y.emit.heartbeats.enabled=false`
   // Reason for this behavior: for a given replication 
flow A->B with heartbeats, 2 herders are required :
   // B->A for the MirrorHeartbeatConnector (emits 
heartbeats into A)
   // A->B for the MirrorSourceConnector (actual 
replication flow)
   if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565722046



##
File path: core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
##
@@ -139,6 +145,25 @@ class KafkaNetworkChannelTest {
 }
   }
 
+  @Test
+  def testNonRoutableAddressUpdateRequest(): Unit = {
+val destinationId = 2
+assertThrows(classOf[IllegalArgumentException],
+  () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))

Review comment:
   Can we move this to `RaftConfigTest`? It's not really part of the 
behavior of `KafkaNetworkChannel`.

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,7 +87,48 @@
 private final int electionBackoffMaxMs;
 private final int fetchTimeoutMs;
 private final int appendLingerMs;
-private final Map voterConnections;
+private final Map voterConnections;
+
+public static abstract class AddressSpec {
+   public abstract InetSocketAddress address();

Review comment:
   Do we need this in the abstract class? I was thinking we would only be 
able to access `InetSocketAddress` if the type is `InetAddressSpec`. Otherwise 
the type protection from `AddressSpec` loses its bite.

##
File path: core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
##
@@ -139,6 +145,25 @@ class KafkaNetworkChannelTest {
 }
   }
 
+  @Test
+  def testNonRoutableAddressUpdateRequest(): Unit = {
+val destinationId = 2
+assertThrows(classOf[IllegalArgumentException],
+  () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))
+
+// Update channel with a valid endpoint

Review comment:
   Not sure there's much value in the rest of this test. Seems effectively 
the same as `testSendAndReceiveOutboundRequest`.

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,7 +87,48 @@
 private final int electionBackoffMaxMs;
 private final int fetchTimeoutMs;
 private final int appendLingerMs;
-private final Map voterConnections;
+private final Map voterConnections;
+
+public static abstract class AddressSpec {
+   public abstract InetSocketAddress address();
+
+@Override
+public boolean equals(Object obj) {
+if (this == obj) {
+return true;
+}
+
+if (obj == null || getClass() != obj.getClass()) {
+return false;
+}
+
+final AddressSpec that = (AddressSpec) obj;
+return that.address().equals(address());
+}
+}
+
+public static class InetAddressSpec extends AddressSpec {
+private final InetSocketAddress address;
+
+public InetAddressSpec(InetSocketAddress address) {
+if (address.equals(UNROUTABLE_ADDRESS)) {
+throw new IllegalArgumentException("Address not routable");
+}
+this.address = address;
+}
+
+@Override
+public InetSocketAddress address() {
+return address;
+}
+}
+
+public static class UnknownAddressSpec extends AddressSpec {

Review comment:
   A common pattern for classes like this without any state is to create a 
static instance.
   ```java
 public static final UnknownAddressSpec INSTANCE = new UnknownAddressSpec();
   ```

##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -118,9 +119,20 @@ class KafkaRaftManager[T](
   private val raftIoThread = new RaftIoThread(raftClient)
 
   def startup(): Unit = {
+// Update the voter endpoints (if valid) with what's in RaftConfig
+val voterAddresses: util.Map[Integer, AddressSpec] = 
raftConfig.quorumVoterConnections
+for (voterAddressEntry <- voterAddresses.entrySet.asScala) {
+  voterAddressEntry.getValue match {
+case spec: InetAddressSpec => {
+  netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
+}
+case invalid: AddressSpec => {
+  logger.warn(s"Skipping channel update for destination ID: 
${voterAddressEntry.getKey} " +

Review comment:
   This could be `info` I think in the case of `UnknownAddressSpec`. It is 
expected behavior to skip the update. We could add a third case for unexpected 
`AddressSpec` types.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join

2021-01-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273206#comment-17273206
 ] 

Matthias J. Sax commented on KAFKA-10847:
-

Thanks [~spena] – overall using a second store might be the simples solution 
and if we can get some perf results we can make a better decision if the 
performance is acceptable or not.

The only thing I tend to object is the usage of _wall-clock_ time punctuation, 
because it would introduce non-determinism. And if we use stream-time 
punctuations, we could even avoid punctuations at all, an "piggy-back" emitting 
left/outer join result, each time we process an input record.

Also take into account the grace period, ie, we should only emit left/outer 
join result after a window closes (not when a window end): window close = 
window end + grace period.

> Avoid spurious left/outer join results in stream-stream join 
> -
>
> Key: KAFKA-10847
> URL: https://issues.apache.org/jira/browse/KAFKA-10847
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sergio Peña
>Priority: Major
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565720574



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {

Review comment:
   We generally don't explicitly make this part of the API, and just inform 
users through the javadocs as you've done
   ```suggestion
   public Optional removeStreamThread(final Duration timeout) {
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time

Review comment:
   ```suggestion
* @throws org.apache.kafka.common.errors.TimeoutException if the thread 
does not stop in time
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -88,9 +91,11 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.errors.TimeoutException;

Review comment:
   nit: move the import to the other `o.a.k.*` imports

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (streamThread.getGroupInstanceID().isPresent()) {
+final MemberToRemove memberToRemove = new 
MemberToRemove(streamThread.getGroupInstanceID().get());
+final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
+final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGrou

[GitHub] [kafka] satishd commented on a change in pull request #9980: MINOR: Reduce size of the ProducerStateEntry batchMetadata queue.

2021-01-27 Thread GitBox


satishd commented on a change in pull request #9980:
URL: https://github.com/apache/kafka/pull/9980#discussion_r565730154



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -63,7 +63,7 @@ private[log] object ProducerStateEntry {
   private[log] val NumBatchesToRetain = 5
 
   def empty(producerId: Long) = new ProducerStateEntry(producerId,
-batchMetadata = mutable.Queue[BatchMetadata](),
+batchMetadata = new mutable.Queue[BatchMetadata](5),

Review comment:
   minor: you may want to have it as `new 
mutable.Queue[BatchMetadata](NumBatchesToRetain)`  instead of harcoding 
directly. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-27 Thread GitBox


vvcephei commented on pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#issuecomment-768664736


   Flaky test failures:
   
   ```
   Build / JDK 11 / 
org.apache.kafka.clients.consumer.internals.FetcherTest.testEarlierOffsetResetArrivesLate()
   Build / JDK 11 / 
org.apache.kafka.clients.producer.KafkaProducerTest.testHeadersWithExtendedClasses()
   Build / JDK 15 / 
kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete()
   ```
   
   The most concerning one is the FetcherTest, but it's also failing on trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-27 Thread GitBox


vvcephei merged pull request #9836:
URL: https://github.com/apache/kafka/pull/9836


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565732778



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (groupInstanceID.isPresent()) {

Review comment:
   Ok, so we just do something like if (groupInstanceID.isPresent() && 
!streamThread.getName().equals(Thread.currentThread().getName())  when deciding 
whether to remove it from the group?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565734101



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (streamThread.getGroupInstanceID().isPresent()) {
+final MemberToRemove memberToRemove = new 
MemberToRemove(streamThread.getGroupInstanceID().get());
+final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
+final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = 
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+try {
+
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs 
- begin, TimeUnit.MILLISECONDS);
+} catch (final 
java.util.concurrent.TimeoutException e) {

Review comment:
   We should. And I think maybe we should log the original stack trace





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565731661



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -319,9 +319,9 @@ private void prepareStreamThread(final StreamThread thread, 
final boolean termin
 StreamThread.State.PARTITIONS_ASSIGNED);
 return null;
 }).anyTimes();
+
EasyMock.expect(thread.getGroupInstanceID()).andReturn(Optional.empty()).anyTimes();

Review comment:
   ```suggestion
   
EasyMock.expect(thread.getGroupInstanceID()).andStubReturn(Optional.empty());
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+addStreamStateChangeListener(kafkaStreams);
+startStreamsAndWaitForRunning(kafkaStreams);
+
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+stateTransitionHistory.clear();
+assertThrows(TimeoutException.class, () -> 
kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION)));

Review comment:
   It's a bit weird to test this by passing in a negative timeout but I 
don't have any good ideas for forcing it to exceed the timeout 😕 

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {

Review comment:
   ```suggestion
   public void shouldNotRemoveStreamThreadWithinTimeout() throws Exception {
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   Hm actually now that I think about it, we should probably continue with 
the cleanup to leave the app

[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565740953



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (groupInstanceID.isPresent()) {

Review comment:
   Yeah I think that makes sense here





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565741567



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }

Review comment:
   One last thing, can you add a version of the 
`shouldRemoveStreamThread()` test that uses static membership? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >