[GitHub] [kafka] chia7712 commented on a change in pull request #9778: KAFKA-10874 Fix flaky ClientQuotasRequestTest.testAlterIpQuotasRequest

2020-12-28 Thread GitBox


chia7712 commented on a change in pull request #9778:
URL: https://github.com/apache/kafka/pull/9778#discussion_r549598781



##
File path: core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
##
@@ -212,7 +214,9 @@ class ClientQuotasRequestTest extends BaseRequestTest {
   InetAddress.getByName(unknownHost)
 else
   InetAddress.getByName(entityName)
-assertEquals(expectedMatches(entity), 
servers.head.socketServer.connectionQuotas.connectionRateForIp(entityIp), 0.01)
+TestUtils.waitUntilTrue(
+  () => expectedMatches(entity) - 
servers.head.socketServer.connectionQuotas.connectionRateForIp(entityIp) < 0.01
+  ,"Broker didn't update prop from Zookeeper")

Review comment:
   Could we rename "prop" to "quotas"?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] itantiger commented on a change in pull request #9790: Some parameters will be overwritten which was configured in consumer.config.

2020-12-28 Thread GitBox


itantiger commented on a change in pull request #9790:
URL: https://github.com/apache/kafka/pull/9790#discussion_r549591650



##
File path: core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
##
@@ -96,6 +97,28 @@ class ConsumerPerformanceTest {
 assertEquals("test", config.topic)
 assertEquals(10, config.numMessages)
   }
+  
+  @Test
+  def testConfigWithRecognizedOptionOverride(): Unit = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+propsStream.write("group.id=test_group_id\n".getBytes())
+propsStream.write("client.id=test_client_id".getBytes())
+propsStream.close()

Review comment:
   Okey, I finished the local test and submitted the code :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9790: Some parameters will be overwritten which was configured in consumer.config.

2020-12-28 Thread GitBox


chia7712 commented on a change in pull request #9790:
URL: https://github.com/apache/kafka/pull/9790#discussion_r549589413



##
File path: core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
##
@@ -96,6 +97,28 @@ class ConsumerPerformanceTest {
 assertEquals("test", config.topic)
 assertEquals(10, config.numMessages)
   }
+  
+  @Test
+  def testConfigWithRecognizedOptionOverride(): Unit = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+propsStream.write("group.id=test_group_id\n".getBytes())
+propsStream.write("client.id=test_client_id".getBytes())
+propsStream.close()

Review comment:
   Or could we add ```group.id``` to command-line arguments to make sure it 
(from file) is NOT override by command-line args?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9781: MINOR: Use top-level error in `UpdateFeaturesRequest.getErrorResponse`

2020-12-28 Thread GitBox


chia7712 commented on a change in pull request #9781:
URL: https://github.com/apache/kafka/pull/9781#discussion_r549580987



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
##
@@ -45,16 +44,18 @@ public UpdateFeaturesResponse(UpdateFeaturesResponseData 
data) {
 this.data = data;
 }
 
-public Map errors() {
-return data.results().valuesSet().stream().collect(
-Collectors.toMap(
-result -> result.feature(),
-result -> new ApiError(Errors.forCode(result.errorCode()), 
result.errorMessage(;
+public ApiError topLevelError() {
+return new ApiError(Errors.forCode(data.errorCode()), 
data.errorMessage());
 }
 
 @Override
 public Map errorCounts() {
-return apiErrorCounts(errors());
+Map errorCounts = new HashMap<>();
+updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
+for (UpdatableFeatureResult result : data.results().valuesSet()) {

Review comment:
   Is ```valuesSet``` necessary? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] itantiger commented on a change in pull request #9790: Some parameters will be overwritten which was configured in consumer.config.

2020-12-28 Thread GitBox


itantiger commented on a change in pull request #9790:
URL: https://github.com/apache/kafka/pull/9790#discussion_r549583876



##
File path: core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
##
@@ -96,6 +97,28 @@ class ConsumerPerformanceTest {
 assertEquals("test", config.topic)
 assertEquals(10, config.numMessages)
   }
+  
+  @Test
+  def testConfigWithRecognizedOptionOverride(): Unit = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+propsStream.write("group.id=test_group_id\n".getBytes())
+propsStream.write("client.id=test_client_id".getBytes())
+propsStream.close()

Review comment:
   So, if config file has the param `topic=test`, It will not take effect.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] itantiger commented on a change in pull request #9790: Some parameters will be overwritten which was configured in consumer.config.

2020-12-28 Thread GitBox


itantiger commented on a change in pull request #9790:
URL: https://github.com/apache/kafka/pull/9790#discussion_r549583198



##
File path: core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
##
@@ -96,6 +97,28 @@ class ConsumerPerformanceTest {
 assertEquals("test", config.topic)
 assertEquals(10, config.numMessages)
   }
+  
+  @Test
+  def testConfigWithRecognizedOptionOverride(): Unit = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+propsStream.write("group.id=test_group_id\n".getBytes())
+propsStream.write("client.id=test_client_id".getBytes())
+propsStream.close()

Review comment:
   `--topic` is a `REQUIRED` argument in command-line, I thank Its value is 
determined by the command line. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9790: Some parameters will be overwritten which was configured in consumer.config.

2020-12-28 Thread GitBox


chia7712 commented on a change in pull request #9790:
URL: https://github.com/apache/kafka/pull/9790#discussion_r549578153



##
File path: core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
##
@@ -96,6 +97,28 @@ class ConsumerPerformanceTest {
 assertEquals("test", config.topic)
 assertEquals(10, config.numMessages)
   }
+  
+  @Test
+  def testConfigWithRecognizedOptionOverride(): Unit = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+propsStream.write("group.id=test_group_id\n".getBytes())
+propsStream.write("client.id=test_client_id".getBytes())
+propsStream.close()

Review comment:
   Could you test ```--topic``` to make sure it is not overrided by 
command-line arguments.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon commented on pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#issuecomment-751957476


   > @showuon Thank you very much for your suggestions and comments,In order to 
meet the requirement of submitting PR, do I need to submit a PR again?I'm going 
to create a new jIRA ID for this PR and add test cases.
   
   No, just update in this PR directly.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest

2020-12-28 Thread GitBox


chia7712 commented on a change in pull request #9761:
URL: https://github.com/apache/kafka/pull/9761#discussion_r549576728



##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/ByteBufferInputStreamTest.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import static org.junit.Assert.assertEquals;

Review comment:
   We have a pending PR offering a consistent import order (see 
https://github.com/apache/kafka/pull/8404#discussion_r535791013). Could you 
please follow the rules (although it is not merged)?
   
   1. kafka, org.apache.kafka
   1. com, net, org
   1. java, javax





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9520: MINOR: replace test "expected" parameter by assertThrows

2020-12-28 Thread GitBox


chia7712 commented on pull request #9520:
URL: https://github.com/apache/kafka/pull/9520#issuecomment-751954761


   @ijuma Thanks for your great reviewing. I have addressed most review comment 
in latest commit.
   
   > It would be good to clearly mention the cases where the translation wasn't 
mechanism as they are the most likely sources of issues.
   
   Pardon me. I failed to catch your point.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9520: MINOR: replace test "expected" parameter by assertThrows

2020-12-28 Thread GitBox


chia7712 commented on a change in pull request #9520:
URL: https://github.com/apache/kafka/pull/9520#discussion_r549569836



##
File path: 
connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java
##
@@ -293,16 +295,13 @@ public void testEmptyStruct() {
 new Struct(emptyStructSchema);
 }
 
-@Test(expected = SchemaBuilderException.class)
+@Test
 public void testDuplicateFields() {
-final Schema schema = SchemaBuilder.struct()
-.name("testing")
-.field("id", SchemaBuilder.string().doc("").build())
-.field("id", SchemaBuilder.string().doc("").build())
-.build();
-final Struct struct = new Struct(schema)
-.put("id", "testing");
-struct.validate();
+assertThrows(SchemaBuilderException.class, () -> SchemaBuilder.struct()
+.name("testing")
+.field("id", SchemaBuilder.string().doc("").build())
+.field("id", SchemaBuilder.string().doc("").build())
+.build());

Review comment:
   ```StructTest``` has test cases for ```validate``` method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9520: MINOR: replace test "expected" parameter by assertThrows

2020-12-28 Thread GitBox


chia7712 commented on a change in pull request #9520:
URL: https://github.com/apache/kafka/pull/9520#discussion_r549568729



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
##
@@ -504,20 +510,21 @@ public void writePastLimit() {
 }
 }
 
-@Test(expected = IllegalArgumentException.class)
+@Test
 public void testAppendAtInvalidOffset() {
 ByteBuffer buffer = ByteBuffer.allocate(1024);
 buffer.position(bufferOffset);
 
 long logAppendTime = System.currentTimeMillis();
-MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
RecordBatch.MAGIC_VALUE_V1, compressionType,
+MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
RecordBatch.MAGIC_VALUE_V2, compressionType,

Review comment:
   magic 1 does not support ```ZStandard```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9520: MINOR: replace test "expected" parameter by assertThrows

2020-12-28 Thread GitBox


chia7712 commented on a change in pull request #9520:
URL: https://github.com/apache/kafka/pull/9520#discussion_r549567477



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##
@@ -444,34 +444,30 @@ public void 
testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlread
 assertEquals(DEFAULT_RETRY_BACKOFF_MS, handler.retryBackoffMs());
 }
 
-@Test(expected = IllegalStateException.class)
+@Test
 public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
-transactionManager.failIfNotReadyForSend();
-transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+assertThrows(IllegalStateException.class, () -> 
transactionManager.failIfNotReadyForSend());
 }
 
-@Test(expected = IllegalStateException.class)
+@Test
 public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
 doInitTransactions();
-transactionManager.failIfNotReadyForSend();
-transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+assertThrows(IllegalStateException.class, () -> 
transactionManager.failIfNotReadyForSend());
 }
 
-@Test(expected = KafkaException.class)
+@Test
 public void testMaybeAddPartitionToTransactionAfterAbortableError() {
 doInitTransactions();
 transactionManager.beginTransaction();
 transactionManager.transitionToAbortableError(new KafkaException());
-transactionManager.failIfNotReadyForSend();
-transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+assertThrows(KafkaException.class, () -> 
transactionManager.failIfNotReadyForSend());
 }
 
-@Test(expected = KafkaException.class)
+@Test
 public void testMaybeAddPartitionToTransactionAfterFatalError() {
 doInitTransactions();
 transactionManager.transitionToFatalError(new KafkaException());
-transactionManager.failIfNotReadyForSend();
-transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+assertThrows(KafkaException.class, () -> 
transactionManager.failIfNotReadyForSend());

Review comment:
   
https://github.com/apache/kafka/commit/717c55be971df862c55f55d245b9997f1d6f998c 
moved the check of state out of ```maybeAddPartitionToTransaction``` and it 
added ```failIfNotReadyForSend``` to all test cases to make them throw 
exception. I will update the test cases name to avoid confusion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


wenbingshen commented on pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#issuecomment-751941992


   @showuon  Thank you very much for your suggestions and comments,In order to 
meet the requirement of submitting PR, do I need to submit a PR again?I'm going 
to create a new jIRA ID for this PR and add test cases.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon commented on a change in pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#discussion_r549561892



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int,
 }
   }
 
+  protected def isControlRequest(header: RequestHeader): Boolean = {
+if (isControlPlane) {
+  header.apiKey() match {
+case ApiKeys.LEADER_AND_ISR => true
+case ApiKeys.STOP_REPLICA => true
+case ApiKeys.UPDATE_METADATA => true
+case ApiKeys.CONTROLLED_SHUTDOWN => true
+case _ => false
+  }
+}else {

Review comment:
   Not sure if this is better=:
   ```scala
 header.apiKey() match {
   case ApiKeys.LEADER_AND_ISR | ApiKeys.STOP_REPLICA | ... | ... => true
   case _ => false
 }
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon commented on a change in pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#discussion_r549561892



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int,
 }
   }
 
+  protected def isControlRequest(header: RequestHeader): Boolean = {
+if (isControlPlane) {
+  header.apiKey() match {
+case ApiKeys.LEADER_AND_ISR => true
+case ApiKeys.STOP_REPLICA => true
+case ApiKeys.UPDATE_METADATA => true
+case ApiKeys.CONTROLLED_SHUTDOWN => true
+case _ => false
+  }
+}else {

Review comment:
   Not sure if this is better=:
   ```scala
 header.apiKey() match {
   case ApiKeys.LEADER_AND_ISR | ApiKeys.STOP_REPLICA | ... | ... if 
(isControlPlane) => true
   case _ => false
 }
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon commented on a change in pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#discussion_r549561892



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int,
 }
   }
 
+  protected def isControlRequest(header: RequestHeader): Boolean = {
+if (isControlPlane) {
+  header.apiKey() match {
+case ApiKeys.LEADER_AND_ISR => true
+case ApiKeys.STOP_REPLICA => true
+case ApiKeys.UPDATE_METADATA => true
+case ApiKeys.CONTROLLED_SHUTDOWN => true
+case _ => false
+  }
+}else {

Review comment:
   Not sure if this is better (so no `else` needed):
   ```scala
   protected def isControlRequest(header: RequestHeader): Boolean = {
 header.apiKey() match {
   case ApiKeys.LEADER_AND_ISR | ApiKeys.STOP_REPLICA | ... | ... if 
(isControlPlane) => true
   case _ => false
 }
   }
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon edited a comment on pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#issuecomment-751936350


   @wenbingshen , thanks for the PR. Some comments left, but most importantly, 
we don't know which JIRA ticket you're fixing. Please check the [contributing 
code 
change](https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes)
 page. Thank you.
   > The PR title should usually be of the form KAFKA-: Title, where 
KAFKA- is the relevant JIRA id and Title may be the JIRA's title or a more 
specific title describing the PR itself. For trivial cases where a JIRA is not 
required (see JIRA section for more details) MINOR: or HOTFIX: can be used as 
the PR title prefix.
   
   Also, please add tests. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon commented on a change in pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#discussion_r549561892



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int,
 }
   }
 
+  protected def isControlRequest(header: RequestHeader): Boolean = {
+if (isControlPlane) {
+  header.apiKey() match {
+case ApiKeys.LEADER_AND_ISR => true
+case ApiKeys.STOP_REPLICA => true
+case ApiKeys.UPDATE_METADATA => true
+case ApiKeys.CONTROLLED_SHUTDOWN => true
+case _ => false
+  }
+}else {

Review comment:
   Not sure if this is better:
   ```scala
   header.apiKey() match {
 case ApiKeys.LEADER_AND_ISR | ApiKeys.STOP_REPLICA | ... | ... if 
(isControlPlane) => true
 case _ => false
   }
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon commented on a change in pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#discussion_r549558749



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int,
 }
   }
 
+  protected def isControlRequest(header: RequestHeader): Boolean = {

Review comment:
   could we use `private` here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon commented on a change in pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#discussion_r549558597



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int,
 }
   }
 
+  protected def isControlRequest(header: RequestHeader): Boolean = {
+if (isControlPlane) {
+  header.apiKey() match {
+case ApiKeys.LEADER_AND_ISR => true
+case ApiKeys.STOP_REPLICA => true
+case ApiKeys.UPDATE_METADATA => true
+case ApiKeys.CONTROLLED_SHUTDOWN => true
+case _ => false
+  }
+}else {

Review comment:
   formatting is not correct. `} else {`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon commented on a change in pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#discussion_r549558489



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int,
 }
   }
 
+  protected def isControlRequest(header: RequestHeader): Boolean = {
+if (isControlPlane) {
+  header.apiKey() match {
+case ApiKeys.LEADER_AND_ISR => true
+case ApiKeys.STOP_REPLICA => true
+case ApiKeys.UPDATE_METADATA => true
+case ApiKeys.CONTROLLED_SHUTDOWN => true
+case _ => false
+  }
+}else {
+  true
+}
+  }
+
   private def processCompletedReceives(): Unit = {
 selector.completedReceives.forEach { receive =>
   try {
 openOrClosingChannel(receive.source) match {
   case Some(channel) =>
 val header = parseRequestHeader(receive.payload)
-if (header.apiKey == ApiKeys.SASL_HANDSHAKE && 
channel.maybeBeginServerReauthentication(receive,
-  () => time.nanoseconds()))
-  trace(s"Begin re-authentication: $channel")
-else {
-  val nowNanos = time.nanoseconds()
-  if (channel.serverAuthenticationSessionExpired(nowNanos)) {
-// be sure to decrease connection count and drop any in-flight 
responses
-debug(s"Disconnecting expired channel: $channel : $header")
-close(channel.id)
-expiredConnectionsKilledCount.record(null, 1, 0)
-  } else {
-val connectionId = receive.source
-val context = new RequestContext(header, connectionId, 
channel.socketAddress,
-  channel.principal, listenerName, securityProtocol,
-  channel.channelMetadataRegistry.clientInformation, 
isPrivilegedListener, channel.principalSerde)
-
-var req = new RequestChannel.Request(processor = id, context = 
context,
-  startTimeNanos = nowNanos, memoryPool, receive.payload, 
requestChannel.metrics, None)
-
-if (req.header.apiKey == ApiKeys.ENVELOPE) {
-  // Override the request context with the forwarded request 
context.
-  // The envelope's context will be preserved in the forwarded 
context
-
-  req = parseForwardedPrincipal(req, 
channel.principalSerde.asScala) match {
-case Some(forwardedPrincipal) =>
-  buildForwardedRequestContext(req, forwardedPrincipal)
-
-case None =>
-  val envelopeResponse = new 
EnvelopeResponse(Errors.PRINCIPAL_DESERIALIZATION_FAILURE)
-  sendEnvelopeResponse(req, envelopeResponse)
-  null
+if (!isControlRequest(header)) {
+  info(s"Current plane is control-plan, disconnecting non 
controller channel: $channel : $header")
+  close(channel.id)
+}else{

Review comment:
   formatting is not correct. `} else {`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon commented on a change in pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#discussion_r549557939



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int,
 }
   }
 
+  protected def isControlRequest(header: RequestHeader): Boolean = {
+if (isControlPlane) {
+  header.apiKey() match {
+case ApiKeys.LEADER_AND_ISR => true
+case ApiKeys.STOP_REPLICA => true
+case ApiKeys.UPDATE_METADATA => true
+case ApiKeys.CONTROLLED_SHUTDOWN => true
+case _ => false
+  }
+}else {
+  true
+}
+  }
+
   private def processCompletedReceives(): Unit = {
 selector.completedReceives.forEach { receive =>
   try {
 openOrClosingChannel(receive.source) match {
   case Some(channel) =>
 val header = parseRequestHeader(receive.payload)
-if (header.apiKey == ApiKeys.SASL_HANDSHAKE && 
channel.maybeBeginServerReauthentication(receive,
-  () => time.nanoseconds()))
-  trace(s"Begin re-authentication: $channel")
-else {
-  val nowNanos = time.nanoseconds()
-  if (channel.serverAuthenticationSessionExpired(nowNanos)) {
-// be sure to decrease connection count and drop any in-flight 
responses
-debug(s"Disconnecting expired channel: $channel : $header")
-close(channel.id)
-expiredConnectionsKilledCount.record(null, 1, 0)
-  } else {
-val connectionId = receive.source
-val context = new RequestContext(header, connectionId, 
channel.socketAddress,
-  channel.principal, listenerName, securityProtocol,
-  channel.channelMetadataRegistry.clientInformation, 
isPrivilegedListener, channel.principalSerde)
-
-var req = new RequestChannel.Request(processor = id, context = 
context,
-  startTimeNanos = nowNanos, memoryPool, receive.payload, 
requestChannel.metrics, None)
-
-if (req.header.apiKey == ApiKeys.ENVELOPE) {
-  // Override the request context with the forwarded request 
context.
-  // The envelope's context will be preserved in the forwarded 
context
-
-  req = parseForwardedPrincipal(req, 
channel.principalSerde.asScala) match {
-case Some(forwardedPrincipal) =>
-  buildForwardedRequestContext(req, forwardedPrincipal)
-
-case None =>
-  val envelopeResponse = new 
EnvelopeResponse(Errors.PRINCIPAL_DESERIALIZATION_FAILURE)
-  sendEnvelopeResponse(req, envelopeResponse)
-  null
+if (!isControlRequest(header)) {
+  info(s"Current plane is control-plan, disconnecting non 
controller channel: $channel : $header")

Review comment:
   typo: `control-plane`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


showuon commented on pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#issuecomment-751936350


   @wenbingshen , thanks for the PR. Some comments left, but most importantly, 
we don't know which JIRA ticket you're fixing. Please check the [contributing 
code 
change](https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes)
 page. Thank you.
   > The PR title should usually be of the form KAFKA-: Title, where 
KAFKA- is the relevant JIRA id and Title may be the JIRA's title or a more 
specific title describing the PR itself. For trivial cases where a JIRA is not 
required (see JIRA section for more details) MINOR: or HOTFIX: can be used as 
the PR title prefix.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r549555914



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedInternal.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Branched;
+import org.apache.kafka.streams.kstream.KStream;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+class BranchedInternal extends Branched {
+BranchedInternal(final Branched branched) {
+super(branched);
+}
+
+BranchedInternal() {
+super(null, null, null);
+}
+
+static  BranchedInternal empty() {
+return new BranchedInternal<>();
+}
+
+String getName() {

Review comment:
   It's a naming convention in the whole Kafka code base, to omit the `get` 
prefix for all getter methods, ie, this should be `name()`. (Similar below for 
the other getters.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r549546234



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval,
  */
 Map appConfigsWithPrefix(final String prefix);
 
+/**
+ * Returns current cached wall-clock system timestamp in milliseconds.

Review comment:
   nit: `Return` without `s` -- we use imperative to write JavaDocs.
   
   I would remove `cached` and add a dedicated second sentence about it. Also 
`wall-clock time` and `system time` are synonymous and thus `wall-clock system 
time` sounds a little odd.
   
   What about:
   ```
   Return the current system timestamp (also called wall-clock time) in 
milliseconds.
   
   Note: this method returns the internally cached system timestamp from the 
Kafka Stream runtime. Thus, it may return a different value compared to 
`System.currentTimeMillis()`.
   
   For a global processor, Kafka Streams does not cache system time and thus 
calling this method will return the same value as `System.currentTimeMillis()`.
   
   @return the current system timestamp in milliseconds
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
##
@@ -166,4 +166,14 @@ public long timestamp() {
 public Map appConfigsWithPrefix(final String prefix) {
 return delegate.appConfigsWithPrefix(prefix);
 }
+
+@Override
+public long currentSystemTimeMs() {
+throw new UnsupportedOperationException("this method is not supported 
in ForwardingDisabledProcessor context");

Review comment:
   Why do we through here? It seems to be safe to get the time from 
`delegate` object? We only disable `forward` but nothing else. (same below)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##
@@ -115,6 +116,16 @@ public void commit() {
 //no-op
 }
 
+@Override
+public long currentSystemTimeMs() {
+return Time.SYSTEM.milliseconds();
+}
+
+@Override
+public long currentStreamTimeMs() {
+throw new UnsupportedOperationException("this method is not supported 
in global processor context.");

Review comment:
   This makes sense, but we might want to document this somewhere else, ie, 
in the corresponding docs. To be fair, not sure atm, if we have much content 
about it and/or where it add it?
   
   We should for sure document it in the JavaDocs, ie, the `ProcessorContext` 
interface. (cf my comment above)
   
   About the error message:
   ```
   throw new UnsupportedOperationException("There is no concept of stream-time 
for a global processor.");
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval,
  */
 Map appConfigsWithPrefix(final String prefix);
 
+/**
+ * Returns current cached wall-clock system timestamp in milliseconds.
+ *
+ * @return the current cached wall-clock system timestamp in milliseconds
+ */
+long currentSystemTimeMs();
+
+/**
+ * Returns the maximum timestamp of any record yet processed by the task.

Review comment:
   What about:
   ```
   Return the current stream-time in milliseconds.
   
   Stream-time is the maximum observed {@link TimestampExtractor record 
timestamp} so far
   (including the currently processed record), i.e., it can be considered a 
high-watermark.
   Stream-time is tracked on a per-task basis and is preserved across restarts 
and during task migration.
   
   Note: this method is not supported for global processors (cf. {@link 
Topology#addGlobalStore(...)}
   and {@link StreamsBuilder#addGlobalStore(...)}, because there is no concept 
of stream-time for this case.
   Calling this method in a global processor with result in an {@link 
UnsupportedOperationException}.
   
   @return the current stream-time in milliseconds
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
##
@@ -160,4 +160,14 @@ public long timestamp() {
 public Map appConfigsWithPrefix(final String prefix) {
 return delegate.appConfigsWithPrefix(prefix);
 }
+
+@Override
+public long currentSystemTimeMs() {
+throw new UnsupportedOperationException("this method is not supported 
in StoreToProcessorContextAdapter");

Review comment:
   We should align the error message to the ones from above: `"StateStores 
can't access system time."` (same below).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -,13 +,21 @@ RecordCollector recordCollector() {
 

[GitHub] [kafka] mjsax commented on pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-751926120


   @inponomarev -- Can you also update the docs for Kafka Streams and the 2.8 
upgrade guide in this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9553: KAFKA-10427: Fetch snapshot

2020-12-28 Thread GitBox


hachikuji merged pull request #9553:
URL: https://github.com/apache/kafka/pull/9553


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9553: KAFKA-10427: Fetch snapshot

2020-12-28 Thread GitBox


hachikuji commented on pull request #9553:
URL: https://github.com/apache/kafka/pull/9553#issuecomment-751924971


   I manually reran tests since it has been a week since the last update. Here 
is a link to the results since they have not been automatically linked: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9553/14/.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie commented on a change in pull request #9778: KAFKA-10874 Fix flaky ClientQuotasRequestTest.testAlterIpQuotasRequest

2020-12-28 Thread GitBox


g1geordie commented on a change in pull request #9778:
URL: https://github.com/apache/kafka/pull/9778#discussion_r549545680



##
File path: core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
##
@@ -212,7 +214,10 @@ class ClientQuotasRequestTest extends BaseRequestTest {
   InetAddress.getByName(unknownHost)
 else
   InetAddress.getByName(entityName)
-assertEquals(expectedMatches(entity), 
servers.head.socketServer.connectionQuotas.connectionRateForIp(entityIp), 0.01)
+TestUtils.retry(1L) {

Review comment:
   Thank you





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-751922810


   About the original comment: 
https://github.com/apache/kafka/pull/9107#issuecomment-666749809
   
   I am fine with those changes.
   
   About https://github.com/apache/kafka/pull/9107#issuecomment-751261181 -- 
that is a good point. Thanks for explaining. I guess it's a "philosophical" 
question if we want to allow this pattern though, or if we want to require that 
either `defaultBranch()` or `noDefaultBranch()` is called? -- I did consider 
calling `branch()` like a builder pattern, and the final `[noD|d]efaultBranch` 
call is basically `build()`?
   
   Curious to hear what @vvcephei thinks about it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller

2020-12-28 Thread GeoffreyStark (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GeoffreyStark updated KAFKA-10890:
--
Description: 
In auto.leader.rebalance.enabled=true 

Taking a Broker 1013 (with many leader partitions) offline;

Then, after the leaders of these subdivisions have been elected from other ISRs,

In theory, 1013 should once again become the leader of those partitions

Indeed, if you look at the describe command and the information of the 
corresponding partition in ZK, you can see that the leader has changed back to 
1013.

But if you look at the Controller log and the 1013 log at this point, you'll 
see that there are some warning messages,

 

 

It looks like the controller failed to send a LeaderAndIsr request to a 1013 
node after it restarted, Then the 1013 node‘s log has been

 
{code:java}
"Ignoring LeaderAndIsr request from controller 1017 with the correlation id 33 
epoch 6 fro partition sp since its associated leader 5 is not who the current 
leader epoch 5 (state. Change. Logger). "
{code}
 

 

After a while,

the producer reports an error.
{code:java}
"The server is not The leader for that topically -- partition.. Going to 
request metadata update now"
{code}
 

  was:
In auto.leader.rebalance.enabled=true 

Taking a Broker 1013 (with many leader partitions) offline;

Then, after the leaders of these subdivisions have been elected from other ISRs,

In theory, 1013 should once again become the leader of those partitions

Indeed, if you look at the describe command and the information of the 
corresponding partition in ZK, you can see that the leader has changed back to 
1013.

But if you look at the Controller log and the 1013 log at this point, you'll 
see that there are some warning messages, and after a while,

the producer reports an error"

 The server is not the leader for that topic-partition..Going to request 
metadata update now

"

 

It looks like the controller failed to send a LeaderAndIsr request to a 1013 
node after it restarted, Then the 1013 node‘s log has been

 
{code:java}
"Ignoring LeaderAndIsr request from controller 1017 with the correlation id 33 
epoch 6 fro partition sp since its associated leader 5 is not who the current 
leader epoch 5 (state. Change. Logger). "
{code}
 

 

After a while, The producer reports an exception.
{code:java}
"The server is not The leader for that topically -- partition.. Going to 
request metadata update now"
{code}
 


>  Broker just stated Ignoring LeaderAndIsr request from controller
> -
>
> Key: KAFKA-10890
> URL: https://issues.apache.org/jira/browse/KAFKA-10890
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
> Environment: kfk 2.0
> 74 brokers
> 3 replica-factors
>Reporter: GeoffreyStark
>Priority: Major
> Attachments: image-2020-12-28-16-59-03-492.png, 
> jstack-1013broker-1228-1312, kafka元数据混乱.docx
>
>
> In auto.leader.rebalance.enabled=true 
> Taking a Broker 1013 (with many leader partitions) offline;
> Then, after the leaders of these subdivisions have been elected from other 
> ISRs,
> In theory, 1013 should once again become the leader of those partitions
> Indeed, if you look at the describe command and the information of the 
> corresponding partition in ZK, you can see that the leader has changed back 
> to 1013.
> But if you look at the Controller log and the 1013 log at this point, you'll 
> see that there are some warning messages,
>  
>  
> It looks like the controller failed to send a LeaderAndIsr request to a 1013 
> node after it restarted, Then the 1013 node‘s log has been
>  
> {code:java}
> "Ignoring LeaderAndIsr request from controller 1017 with the correlation id 
> 33 epoch 6 fro partition sp since its associated leader 5 is not who the 
> current leader epoch 5 (state. Change. Logger). "
> {code}
>  
>  
> After a while,
> the producer reports an error.
> {code:java}
> "The server is not The leader for that topically -- partition.. Going to 
> request metadata update now"
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r549543943



##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * The {@code Branched} class is used to define the optional parameters when 
building branches with
+ * {@link BranchedKStream}.
+ *
+ * @param  type of record key
+ * @param  type of record value
+ */
+public class Branched implements NamedOperation> {
+
+protected final String name;
+protected final Function,
+? extends KStream> chainFunction;
+protected final Consumer> chainConsumer;
+
+protected Branched(final String name,
+   final Function, ? extends 
KStream> chainFunction,
+   final Consumer> chainConsumer) {
+this.name = name;
+this.chainFunction = chainFunction;
+this.chainConsumer = chainConsumer;
+}
+
+/**
+ * Create an instance of {@link Branched} from an existing instance.
+ *
+ * @param branched the instance of {@link Branched} to copy
+ */
+protected Branched(final Branched branched) {
+this(branched.name, branched.chainFunction, branched.chainConsumer);
+}
+
+/**
+ * Configure the instance of {@link Branched} with a branch name postfix.
+ *
+ * @param name the branch name postfix to be used. If {@code null} a 
default branch name postfix will be generated (see
+ * {@link BranchedKStream} description for details)
+ * @return {@code this}
+ */
+@Override
+public Branched withName(final String name) {
+return new Branched<>(name, chainFunction, chainConsumer);
+}
+
+/**
+ * Create an instance of {@link Branched} with provided branch name 
postfix.
+ *
+ * @param name the branch name postfix to be used. If {@code null}, a 
default branch name postfix will be generated
+ * (see {@link BranchedKStream} description for details)
+ * @param   key type
+ * @param   value type
+ * @return a new instance of {@link Branched}
+ */
+public static  Branched as(final String name) {
+return new Branched<>(name, null, null);
+}
+
+/**
+ * Create an instance of {@link Branched} with provided chain function.
+ *
+ * @param chain A function that will be applied to the branch. If {@code 
null}, the identity
+ *  {@code kStream -> kStream} function will be supposed. If 
this function returns
+ *  {@code null}, its result is ignored, otherwise it is added 
to the {@code Map} returned
+ *  by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+ *  {@link BranchedKStream} description for details).
+ * @paramkey type
+ * @paramvalue type
+ * @return a new instance of {@link Branched}
+ */
+public static  Branched withFunction(
+final Function,
+? extends KStream> chain) {
+return new Branched<>(null, chain, null);
+}
+
+/**
+ * Create an instance of {@link Branched} with provided chain consumer.
+ *
+ * @param chain A consumer to which the branch will be sent. If a non-null 
branch is provided here,
+ *  the respective branch will not be added to the resulting 
{@code Map} returned
+ *  by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+ *  {@link BranchedKStream} description for details). If 
{@code null}, a no-op consumer will be supposed

Review comment:
   SGTM. In general, even if possible, I don't like to give `null` 
semantics :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, 

[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r549543247



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedKStreamImpl.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Branched;
+import org.apache.kafka.streams.kstream.BranchedKStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BranchedKStreamImpl implements BranchedKStream {
+
+private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
+
+private final KStreamImpl source;
+private final boolean repartitionRequired;
+private final String splitterName;
+private final Map> result = new HashMap<>();
+
+private final List> predicates = new 
ArrayList<>();
+private final List childNames = new ArrayList<>();
+private final ProcessorGraphNode splitterNode;
+
+BranchedKStreamImpl(final KStreamImpl source, final boolean 
repartitionRequired, final NamedInternal named) {
+this.source = source;
+this.repartitionRequired = repartitionRequired;
+this.splitterName = named.orElseGenerateWithPrefix(source.builder, 
BRANCH_NAME);
+
+// predicates and childNames are passed by reference so when the user 
adds a branch they get added to

Review comment:
   Just saw your other comment: 
https://github.com/apache/kafka/pull/9107#issuecomment-751261181





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r549542995



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedKStreamImpl.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Branched;
+import org.apache.kafka.streams.kstream.BranchedKStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BranchedKStreamImpl implements BranchedKStream {
+
+private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
+
+private final KStreamImpl source;
+private final boolean repartitionRequired;
+private final String splitterName;
+private final Map> result = new HashMap<>();
+
+private final List> predicates = new 
ArrayList<>();
+private final List childNames = new ArrayList<>();
+private final ProcessorGraphNode splitterNode;
+
+BranchedKStreamImpl(final KStreamImpl source, final boolean 
repartitionRequired, final NamedInternal named) {
+this.source = source;
+this.repartitionRequired = repartitionRequired;
+this.splitterName = named.orElseGenerateWithPrefix(source.builder, 
BRANCH_NAME);
+
+// predicates and childNames are passed by reference so when the user 
adds a branch they get added to

Review comment:
   Would love to learn about it. -- In general, it's easier to follow the 
same pattern throughout the code base. It easier to reason about the code that 
way, and also easier for people to learn the code base.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r549542855



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedKStreamImpl.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Branched;
+import org.apache.kafka.streams.kstream.BranchedKStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BranchedKStreamImpl implements BranchedKStream {
+
+private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
+
+private final KStreamImpl source;
+private final boolean repartitionRequired;
+private final String splitterName;
+private final Map> result = new HashMap<>();
+
+private final List> predicates = new 
ArrayList<>();
+private final List childNames = new ArrayList<>();
+private final ProcessorGraphNode splitterNode;
+
+BranchedKStreamImpl(final KStreamImpl source, final boolean 
repartitionRequired, final NamedInternal named) {
+this.source = source;
+this.repartitionRequired = repartitionRequired;
+this.splitterName = named.orElseGenerateWithPrefix(source.builder, 
BRANCH_NAME);
+
+// predicates and childNames are passed by reference so when the user 
adds a branch they get added to
+final ProcessorParameters processorParameters =
+new ProcessorParameters<>(new KStreamBranch<>(predicates, 
childNames), splitterName);
+splitterNode = new ProcessorGraphNode<>(splitterName, 
processorParameters);
+source.builder.addGraphNode(source.streamsGraphNode, splitterNode);
+}
+
+@Override
+public BranchedKStream branch(final Predicate 
predicate) {
+return branch(predicate, BranchedInternal.empty());
+}
+
+@Override
+public BranchedKStream branch(final Predicate 
predicate, final Branched branched) {
+predicates.add(predicate);
+createBranch(branched, predicates.size());
+return this;
+}
+
+@Override
+public Map> defaultBranch() {
+return defaultBranch(BranchedInternal.empty());
+}
+
+@Override
+public Map> defaultBranch(final Branched 
branched) {
+createBranch(branched, 0);

Review comment:
   I guess it's fine both ways. -- The point about the index is a good one 
that I missed. But would still be doable I guess.
   
   I don't think that there would be any measurable runtime difference if you 
use a "default predicate" (what we also do in the current implementation) -- 
the code is just a little "cleaner" as we don't need an extra "if" at the end 
-- but it's also not the end of the world as the `process` method is fairly 
simply anyway.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10779) Reassignment tool sets throttles incorrectly when overriding a reassignment

2020-12-28 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming reassigned KAFKA-10779:
--

Assignee: dengziming

> Reassignment tool sets throttles incorrectly when overriding a reassignment
> ---
>
> Key: KAFKA-10779
> URL: https://issues.apache.org/jira/browse/KAFKA-10779
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: dengziming
>Priority: Major
>
> The logic in `ReassignPartitionsCommand.calculateProposedMoveMap` assumes 
> that adding replicas are not included in the replica set returned from 
> `Metadata` or `ListPartitionReassignments`.  This is evident in the test case 
> `ReassignPartitionsUnitTest.testMoveMap`. Because of this incorrect 
> assumption, the move map is computed incorrectly which can result in the 
> wrong throttles being applied. As far as I can tell, this is only an issue 
> when overriding an existing reassignment. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r547605439



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedKStreamImpl.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Branched;
+import org.apache.kafka.streams.kstream.BranchedKStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BranchedKStreamImpl implements BranchedKStream {
+
+private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
+
+private final KStreamImpl source;
+private final boolean repartitionRequired;
+private final String splitterName;
+private final Map> result = new HashMap<>();
+
+private final List> predicates = new 
ArrayList<>();
+private final List childNames = new ArrayList<>();
+private final ProcessorGraphNode splitterNode;
+
+BranchedKStreamImpl(final KStreamImpl source, final boolean 
repartitionRequired, final NamedInternal named) {
+this.source = source;
+this.repartitionRequired = repartitionRequired;
+this.splitterName = named.orElseGenerateWithPrefix(source.builder, 
BRANCH_NAME);
+
+// predicates and childNames are passed by reference so when the user 
adds a branch they get added to
+final ProcessorParameters processorParameters =
+new ProcessorParameters<>(new KStreamBranch<>(predicates, 
childNames), splitterName);
+splitterNode = new ProcessorGraphNode<>(splitterName, 
processorParameters);
+source.builder.addGraphNode(source.streamsGraphNode, splitterNode);
+}
+
+@Override
+public BranchedKStream branch(final Predicate 
predicate) {
+return branch(predicate, BranchedInternal.empty());

Review comment:
   Missing `null` check for `predicate` (similar below for other methods 
and parameters).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r547602314



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedInternal.java
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Branched;
+import org.apache.kafka.streams.kstream.KStream;
+
+import java.util.Map;
+
+class BranchedInternal extends Branched {
+BranchedInternal(final Branched branched) {
+super(branched);
+}
+
+BranchedInternal() {
+super(null, null, null);
+}
+
+static  BranchedInternal empty() {
+return new BranchedInternal<>();
+}
+
+String branchProcessorName(final String prefix, final int index) {
+if (name == null) {
+return prefix + index;
+} else {
+return prefix + name;
+}
+}
+
+public void process(final KStreamImpl newStream, final String 
branchChildName, final Map> result) {
+final KStream transformedStream;
+if (chainFunction == null) {
+transformedStream = newStream;
+} else {
+transformedStream = chainFunction.apply(newStream);
+}
+if (transformedStream == null) {
+return;
+}
+if (chainConsumer != null) {
+chainConsumer.accept(transformedStream);
+return;
+} else {
+result.put(branchChildName, transformedStream);
+}
+}

Review comment:
   I think this method is hard to read. Proposal:
   ```
   if (chainFunction != null) {
 final KStream transformedStream = chainFunction.apply(newStream);
 if (transformedStream != null) {
   result.put(branchChildName, transformedStream);
 }
   } else if (chainConsumer != null) {
 chainConsumer.accept(transformedStream);
   } else {
 result.put(branchChildName, newStream);
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r547599107



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.util.Map;
+
+/**
+ * Branches the records in the original stream based on the predicates 
supplied for the branch definitions.
+ * 
+ * Branches are defined with {@link BranchedKStream#branch(Predicate, 
Branched)} or
+ * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is 
evaluated against the predicates
+ * supplied via {@link Branched} parameters, and is routed to the first branch 
for which its respective predicate
+ * evaluates to {@code true}.
+ * 
+ * Each branch (which is a {@link KStream} instance) then can be processed 
either by
+ * a {@link java.util.function.Function} or a {@link 
java.util.function.Consumer} provided via a {@link Branched}
+ * parameter. It also can be accessed from the {@link Map} returned by {@link 
BranchedKStream#defaultBranch(Branched)} or
+ * {@link BranchedKStream#noDefaultBranch()} (see usage 
examples).
+ * 
+ * The branching happens on first-match: A record in the original stream is 
assigned to the corresponding result
+ * stream for the first predicate that evaluates to true, and is assigned to 
this stream only. If you need
+ * to route a record to multiple streams, you can use {@link 
KStream#filter(Predicate)} for each predicate instead
+ * of branching.
+ * 
+ * The process of routing the records to different branches is a stateless 
record-by-record operation.
+ * Rules of forming the resulting map
+ * The keys of the {@code Map>} entries returned by 
{@link BranchedKStream#defaultBranch(Branched)} or
+ * {@link BranchedKStream#noDefaultBranch()} are defined by the following 
rules:
+ * 
+ * 
+ * If {@link Named} parameter was provided for {@link 
KStream#split(Named)}, its value is used as
+ * a prefix for each key. By default, no prefix is used
+ * If a name is provided for the {@link 
BranchedKStream#branch(Predicate, Branched)} via
+ * {@link Branched} parameter, its value is appended to the prefix to form 
the {@code Map} key
+ * If a name is not provided for the branch, then the key defaults to 
{@code prefix + position} of the branch
+ * as a decimal number, starting from {@code "1"}
+ * If a name is not provided for the {@link 
BranchedKStream#defaultBranch()} call, then the key defaults
+ * to {@code prefix + "0"}
+ * 
+ * 
+ * The values of the respective {@code Map>} entries are 
formed as following:
+ * 
+ * 
+ * If no chain function or consumer is provided {@link 
BranchedKStream#branch(Predicate, Branched)} via
+ * {@link Branched} parameter, then the value is the branch itself (which 
is equivalent to {@code ks -> ks}
+ * identity chain function)
+ * If a chain function is provided and returns a non-null value for a 
given branch, then the value is
+ * the result returned by this function
+ * If a chain function returns {@code null} for a given branch, then 
the respective entry is not put to the map.
+ * If a consumer is provided for a given branch, then the the 
respective entry is not put to the map
+ * 
+ * 
+ * For example:
+ *  {@code
+ * Map> result =
+ *   source.split(Named.as("foo-"))
+ * .branch(predicate1, Branched.as("bar"))// "foo-bar"
+ * .branch(predicate2, Branched.withConsumer(ks->ks.to("A"))  // no entry: 
a Consumer is provided
+ * .branch(predicate3, Branched.withFunction(ks->null))   // no entry: 
chain function returns null
+ * .branch(predicate4)// "foo-4": 
name defaults to the branch position
+ * .defaultBranch()   // "foo-0": 
"0" is the default name for the default branch
+ * }
+ *
+ * Usage examples
+ *
+ * Direct Branch Consuming
+ * In many cases we do not need to have a single scope for all the branches, 
each branch being processed completely
+ * independently from others. Then we can use 'consuming' 

[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r549538598



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.util.Map;
+
+/**
+ * Branches the records in the original stream based on the predicates 
supplied for the branch definitions.
+ * 
+ * Branches are defined with {@link BranchedKStream#branch(Predicate, 
Branched)} or
+ * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is 
evaluated against the predicates
+ * supplied via {@link Branched} parameters, and is routed to the first branch 
for which its respective predicate
+ * evaluates to {@code true}.
+ * 
+ * Each branch (which is a {@link KStream} instance) then can be processed 
either by
+ * a {@link java.util.function.Function} or a {@link 
java.util.function.Consumer} provided via a {@link Branched}
+ * parameter. It also can be accessed from the {@link Map} returned by {@link 
BranchedKStream#defaultBranch(Branched)} or
+ * {@link BranchedKStream#noDefaultBranch()} (see usage 
examples).
+ * 
+ * The branching happens on first-match: A record in the original stream is 
assigned to the corresponding result
+ * stream for the first predicate that evaluates to true, and is assigned to 
this stream only. If you need
+ * to route a record to multiple streams, you can use {@link 
KStream#filter(Predicate)} for each predicate instead
+ * of branching.
+ * 
+ * The process of routing the records to different branches is a stateless 
record-by-record operation.
+ * Rules of forming the resulting map
+ * The keys of the {@code Map>} entries returned by 
{@link BranchedKStream#defaultBranch(Branched)} or
+ * {@link BranchedKStream#noDefaultBranch()} are defined by the following 
rules:
+ * 
+ * 
+ * If {@link Named} parameter was provided for {@link 
KStream#split(Named)}, its value is used as
+ * a prefix for each key. By default, no prefix is used
+ * If a name is provided for the {@link 
BranchedKStream#branch(Predicate, Branched)} via
+ * {@link Branched} parameter, its value is appended to the prefix to form 
the {@code Map} key

Review comment:
   Also my second language... I think, it is mostly a matter of style. 
Personally, I prefer to use `.` at the end, because a bullet point is still a 
sentence from my POV, and actually, a bullet point could be multiple sentences. 
You also capitalized the first word of each bullet point, so it seems to be a 
sentence. -- I guess some people don't capitalize the first word and also don't 
use a `.` at the en, ie, us a "no-sentence" bullet point style. But this only 
work for "single sentence" bullet points.
   
   It's really a nit. I just raised my personal (an obviously very subjective) 
preference.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r547591511



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.util.Map;
+
+/**
+ * Branches the records in the original stream based on the predicates 
supplied for the branch definitions.
+ * 
+ * Branches are defined with {@link BranchedKStream#branch(Predicate, 
Branched)} or
+ * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is 
evaluated against the predicates
+ * supplied via {@link Branched} parameters, and is routed to the first branch 
for which its respective predicate
+ * evaluates to {@code true}.
+ * 
+ * Each branch (which is a {@link KStream} instance) then can be processed 
either by
+ * a {@link java.util.function.Function} or a {@link 
java.util.function.Consumer} provided via a {@link Branched}
+ * parameter. It also can be accessed from the {@link Map} returned by {@link 
BranchedKStream#defaultBranch(Branched)} or
+ * {@link BranchedKStream#noDefaultBranch()} (see usage 
examples).
+ * 
+ * The branching happens on first-match: A record in the original stream is 
assigned to the corresponding result
+ * stream for the first predicate that evaluates to true, and is assigned to 
this stream only. If you need
+ * to route a record to multiple streams, you can use {@link 
KStream#filter(Predicate)} for each predicate instead

Review comment:
   `you can apply [multiple] {#filter} [operators], one for each predicate, 
instead`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator

2020-12-28 Thread GitBox


mjsax commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r547597683



##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * The {@code Branched} class is used to define the optional parameters when 
building branches with
+ * {@link BranchedKStream}.
+ *
+ * @param  type of record key
+ * @param  type of record value
+ */
+public class Branched implements NamedOperation> {
+
+protected final String name;
+protected final Function,
+? extends KStream> chainFunction;
+protected final Consumer> chainConsumer;
+
+protected Branched(final String name,
+   final Function, ? extends 
KStream> chainFunction,
+   final Consumer> chainConsumer) {
+this.name = name;
+this.chainFunction = chainFunction;
+this.chainConsumer = chainConsumer;
+}
+
+/**
+ * Create an instance of {@link Branched} from an existing instance.
+ *
+ * @param branched the instance of {@link Branched} to copy
+ */
+protected Branched(final Branched branched) {
+this(branched.name, branched.chainFunction, branched.chainConsumer);
+}
+
+/**
+ * Configure the instance of {@link Branched} with a branch name postfix.
+ *
+ * @param name the branch name postfix to be used. If {@code null} a 
default branch name postfix will be generated (see
+ * {@link BranchedKStream} description for details)
+ * @return {@code this}
+ */
+@Override
+public Branched withName(final String name) {
+return new Branched<>(name, chainFunction, chainConsumer);
+}
+
+/**
+ * Create an instance of {@link Branched} with provided branch name 
postfix.
+ *
+ * @param name the branch name postfix to be used. If {@code null}, a 
default branch name postfix will be generated
+ * (see {@link BranchedKStream} description for details)
+ * @param   key type
+ * @param   value type
+ * @return a new instance of {@link Branched}
+ */
+public static  Branched as(final String name) {
+return new Branched<>(name, null, null);
+}
+
+/**
+ * Create an instance of {@link Branched} with provided chain function.
+ *
+ * @param chain A function that will be applied to the branch. If {@code 
null}, the identity
+ *  {@code kStream -> kStream} function will be supposed. If 
this function returns
+ *  {@code null}, its result is ignored, otherwise it is added 
to the {@code Map} returned
+ *  by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+ *  {@link BranchedKStream} description for details).
+ * @paramkey type
+ * @paramvalue type
+ * @return a new instance of {@link Branched}
+ */
+public static  Branched withFunction(
+final Function,
+? extends KStream> chain) {
+return new Branched<>(null, chain, null);
+}
+
+/**
+ * Create an instance of {@link Branched} with provided chain consumer.
+ *
+ * @param chain A consumer to which the branch will be sent. If a non-null 
branch is provided here,
+ *  the respective branch will not be added to the resulting 
{@code Map} returned
+ *  by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+ *  {@link BranchedKStream} description for details). If 
{@code null}, a no-op consumer will be supposed

Review comment:
   I tried to read up on the KIP discussion thread, and I am wondering if 
we did agree to this behavior? My understanding was that if a consumer is use, 
there won't be any entry in the `Map` for this branch?





This is an automated message from the Apache Git Service.
To respond to 

[GitHub] [kafka] hachikuji commented on a change in pull request #9781: MINOR: Use top-level error in `UpdateFeaturesRequest.getErrorResponse`

2020-12-28 Thread GitBox


hachikuji commented on a change in pull request #9781:
URL: https://github.com/apache/kafka/pull/9781#discussion_r549426896



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
##
@@ -56,19 +54,11 @@ public UpdateFeaturesRequest(UpdateFeaturesRequestData 
data, short version) {
 
 @Override
 public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-final ApiError apiError = ApiError.fromThrowable(e);
-final UpdatableFeatureResultCollection results = new 
UpdatableFeatureResultCollection();
-for (FeatureUpdateKey update : this.data.featureUpdates().valuesSet()) 
{
-final UpdatableFeatureResult result = new UpdatableFeatureResult()
-.setFeature(update.feature())
-.setErrorCode(apiError.error().code())
-.setErrorMessage(apiError.message());
-results.add(result);
-}
-final UpdateFeaturesResponseData responseData = new 
UpdateFeaturesResponseData()
-.setThrottleTimeMs(throttleTimeMs)
-.setResults(results);
-return new UpdateFeaturesResponse(responseData);
+return UpdateFeaturesResponse.createWithErrors(
+ApiError.fromThrowable(e),
+Collections.emptyMap(),

Review comment:
   @chia7712 Good catch. Pushed an update with some test cases.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9520: MINOR: replace test "expected" parameter by assertThrows

2020-12-28 Thread GitBox


ijuma commented on a change in pull request #9520:
URL: https://github.com/apache/kafka/pull/9520#discussion_r549419585



##
File path: 
clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java
##
@@ -23,68 +23,74 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertThrows;
+
 
 public class GarbageCollectedMemoryPoolTest {
 
-@Test(expected = IllegalArgumentException.class)
-public void testZeroSize() throws Exception {
-new GarbageCollectedMemoryPool(0, 7, true, null);
+@Test
+public void testZeroSize() {
+assertThrows(IllegalArgumentException.class,
+() -> new GarbageCollectedMemoryPool(0, 7, true, null));
 }
 
-@Test(expected = IllegalArgumentException.class)
-public void testNegativeSize() throws Exception {
-new GarbageCollectedMemoryPool(-1, 7, false, null);
+@Test
+public void testNegativeSize() {
+assertThrows(IllegalArgumentException.class,
+() -> new GarbageCollectedMemoryPool(-1, 7, false, null));
 }
 
-@Test(expected = IllegalArgumentException.class)
-public void testZeroMaxAllocation() throws Exception {
-new GarbageCollectedMemoryPool(100, 0, true, null);
+@Test
+public void testZeroMaxAllocation() {
+assertThrows(IllegalArgumentException.class,
+() -> new GarbageCollectedMemoryPool(100, 0, true, null));
 }
 
-@Test(expected = IllegalArgumentException.class)
-public void testNegativeMaxAllocation() throws Exception {
-new GarbageCollectedMemoryPool(100, -1, false, null);
+@Test
+public void testNegativeMaxAllocation() {
+assertThrows(IllegalArgumentException.class,
+() -> new GarbageCollectedMemoryPool(100, -1, false, null));
 }
 
-@Test(expected = IllegalArgumentException.class)
-public void testMaxAllocationLargerThanSize() throws Exception {
-new GarbageCollectedMemoryPool(100, 101, true, null);
+@Test
+public void testMaxAllocationLargerThanSize() {
+assertThrows(IllegalArgumentException.class,
+() -> new GarbageCollectedMemoryPool(100, 101, true, null));
 }
 
-@Test(expected = IllegalArgumentException.class)
-public void testAllocationOverMaxAllocation() throws Exception {
+@Test
+public void testAllocationOverMaxAllocation() {
 GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 
10, false, null);
-pool.tryAllocate(11);
+assertThrows(IllegalArgumentException.class, () -> 
pool.tryAllocate(11));
 }
 
-@Test(expected = IllegalArgumentException.class)
-public void testAllocationZero() throws Exception {
+@Test
+public void testAllocationZero() {
 GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 
10, true, null);
-pool.tryAllocate(0);
+assertThrows(IllegalArgumentException.class, () -> 
pool.tryAllocate(0));
 }
 
-@Test(expected = IllegalArgumentException.class)
-public void testAllocationNegative() throws Exception {
+@Test
+public void testAllocationNegative() {
 GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 
10, false, null);
-pool.tryAllocate(-1);
+assertThrows(IllegalArgumentException.class, () -> 
pool.tryAllocate(-1));
 }
 
-@Test(expected = IllegalArgumentException.class)
-public void testReleaseNull() throws Exception {
+@Test
+public void testReleaseNull() {
 GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 
10, true, null);
-pool.release(null);
+assertThrows(IllegalArgumentException.class, () -> pool.release(null));
 }
 
-@Test(expected = IllegalArgumentException.class)
-public void testReleaseForeignBuffer() throws Exception {
+@Test
+public void testReleaseForeignBuffer() {
 GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 
10, true, null);
 ByteBuffer fellOffATruck = ByteBuffer.allocate(1);
-pool.release(fellOffATruck);
-pool.close();

Review comment:
   Do we still need to call `close` to avoid a leak?

##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
##
@@ -504,20 +510,21 @@ public void writePastLimit() {
 }
 }
 
-@Test(expected = IllegalArgumentException.class)
+@Test
 public void testAppendAtInvalidOffset() {
 ByteBuffer buffer = ByteBuffer.allocate(1024);
 buffer.position(bufferOffset);
 
 long logAppendTime = System.currentTimeMillis();
-MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
RecordBatch.MAGIC_VALUE_V1, compressionType,
+MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
RecordBatch.MAGIC_VALUE_V2, compressionType,

Review comment:
   Why did 

[GitHub] [kafka] ijuma commented on a change in pull request #9520: MINOR: replace test "expected" parameter by assertThrows

2020-12-28 Thread GitBox


ijuma commented on a change in pull request #9520:
URL: https://github.com/apache/kafka/pull/9520#discussion_r549417155



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -192,11 +192,12 @@ public void testConstructorWithSerializers() {
 new KafkaProducer<>(producerProps, new ByteArraySerializer(), new 
ByteArraySerializer()).close();
 }
 
-@Test(expected = ConfigException.class)
+@Test
 public void testNoSerializerProvided() {
 Properties producerProps = new Properties();
 producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9000");
-new KafkaProducer(producerProps);
+

Review comment:
   Was this new line intentional?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##
@@ -444,34 +444,30 @@ public void 
testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlread
 assertEquals(DEFAULT_RETRY_BACKOFF_MS, handler.retryBackoffMs());
 }
 
-@Test(expected = IllegalStateException.class)
+@Test
 public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
-transactionManager.failIfNotReadyForSend();
-transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+assertThrows(IllegalStateException.class, () -> 
transactionManager.failIfNotReadyForSend());
 }
 
-@Test(expected = IllegalStateException.class)
+@Test
 public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
 doInitTransactions();
-transactionManager.failIfNotReadyForSend();
-transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+assertThrows(IllegalStateException.class, () -> 
transactionManager.failIfNotReadyForSend());

Review comment:
   Same as above.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##
@@ -444,34 +444,30 @@ public void 
testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlread
 assertEquals(DEFAULT_RETRY_BACKOFF_MS, handler.retryBackoffMs());
 }
 
-@Test(expected = IllegalStateException.class)
+@Test
 public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
-transactionManager.failIfNotReadyForSend();
-transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+assertThrows(IllegalStateException.class, () -> 
transactionManager.failIfNotReadyForSend());
 }
 
-@Test(expected = IllegalStateException.class)
+@Test
 public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
 doInitTransactions();
-transactionManager.failIfNotReadyForSend();
-transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+assertThrows(IllegalStateException.class, () -> 
transactionManager.failIfNotReadyForSend());
 }
 
-@Test(expected = KafkaException.class)
+@Test
 public void testMaybeAddPartitionToTransactionAfterAbortableError() {
 doInitTransactions();
 transactionManager.beginTransaction();
 transactionManager.transitionToAbortableError(new KafkaException());
-transactionManager.failIfNotReadyForSend();
-transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+assertThrows(KafkaException.class, () -> 
transactionManager.failIfNotReadyForSend());

Review comment:
   Same as above.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
##
@@ -663,12 +623,6 @@ public void 
shouldPreserveOffsetsFromCommitByGroupMetadataOnAbortIfTransactionsA
 producer.beginTransaction();
 
 String group2 = "g2";
-Map groupCommit2 = new 
HashMap() {
-{
-put(new TopicPartition(topic, 2), new OffsetAndMetadata(53L, 
null));
-put(new TopicPartition(topic, 3), new OffsetAndMetadata(84L, 
null));
-}
-};
 producer.sendOffsetsToTransaction(groupCommit, new 
ConsumerGroupMetadata(group2));

Review comment:
   I think this was intended to be `groupCommit2`, right?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##
@@ -444,34 +444,30 @@ public void 
testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlread
 assertEquals(DEFAULT_RETRY_BACKOFF_MS, handler.retryBackoffMs());
 }
 
-@Test(expected = IllegalStateException.class)
+@Test
 public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
-transactionManager.failIfNotReadyForSend();
-transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+

[GitHub] [kafka] bertber commented on a change in pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest

2020-12-28 Thread GitBox


bertber commented on a change in pull request #9761:
URL: https://github.com/apache/kafka/pull/9761#discussion_r549409531



##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/ByteBufferInputStreamTest.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+public class ByteBufferInputStreamTest {
+
+@Test
+public void testReadUnsignedIntFromInputStream() throws Exception {

Review comment:
   It have been removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bertber commented on a change in pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest

2020-12-28 Thread GitBox


bertber commented on a change in pull request #9761:
URL: https://github.com/apache/kafka/pull/9761#discussion_r549409289



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
##
@@ -119,5 +119,4 @@ public void iteratorRaisesOnTooLargeRecords() {
 assertNotNull(logInputStream.nextBatch());
 logInputStream.nextBatch();
 }
-

Review comment:
   This change have been reverted.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #9789: Force the validation control request,isolate other data requests.Prot…

2020-12-28 Thread GitBox


wenbingshen commented on pull request #9789:
URL: https://github.com/apache/kafka/pull/9789#issuecomment-751748730


   @showuon This is my first time to submit pr to the community.Could you give 
me some suggestions?Thank you very much!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10889) The log cleaner is not working for topic partitions

2020-12-28 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255618#comment-17255618
 ] 

Wenbing Shen commented on KAFKA-10889:
--

[~becket_qin]  Why not isolate the message creation time from the log append 
time,return the creation time for the client,and use the log append time 
between brokers,which is more friendly to log cleanup.

> The log cleaner is not working for topic partitions
> ---
>
> Key: KAFKA-10889
> URL: https://issues.apache.org/jira/browse/KAFKA-10889
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Blocker
> Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, 
> image-2020-12-28-17-17-15-947.png
>
>
> * I have a topic that is reserved for the default of 7 days, but the log 
> exists from October 26th to December 25th today.The log cleaner doesn't seem 
> to be working on it.This seems to be an underlying problem in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10889) The log cleaner is not working for topic partitions

2020-12-28 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255527#comment-17255527
 ] 

Jiangjie Qin commented on KAFKA-10889:
--

It looks that the broker is working as design. I am not sure what would be 
expected behavior here. If a message of timestamp T was appended to the log, 
and the retention is set to R, that message will only be deleted after T + R. 
Because Kafka maintains the contiguity of the messages, all the messages after 
that message won't be deleted before that message expires.

One potential improvement might be disallowing messages whose timestamp is 
greater than the current system time by too much. But that may also hurt some 
other use cases. And defining "too much" could also be tricky.

> The log cleaner is not working for topic partitions
> ---
>
> Key: KAFKA-10889
> URL: https://issues.apache.org/jira/browse/KAFKA-10889
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Blocker
> Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, 
> image-2020-12-28-17-17-15-947.png
>
>
> * I have a topic that is reserved for the default of 7 days, but the log 
> exists from October 26th to December 25th today.The log cleaner doesn't seem 
> to be working on it.This seems to be an underlying problem in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] itantiger commented on pull request #9790: Some parameters will be overwritten which was configured in consumer.config.

2020-12-28 Thread GitBox


itantiger commented on pull request #9790:
URL: https://github.com/apache/kafka/pull/9790#issuecomment-751673393


   > @itantiger Thanks for your patch. Could you revise the title?
   
   Finished.
   When will this patch merge to trunk?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] itantiger commented on pull request #9790: Some parameters will be overwritten which was configured in consumer.config.

2020-12-28 Thread GitBox


itantiger commented on pull request #9790:
URL: https://github.com/apache/kafka/pull/9790#issuecomment-751665759


   > @itantiger Thanks for your patch. Could you revise the title?
   
   Also thank you very much for your guidance :) .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9790: Some parameters will be overwritten which was configured in consumer.config where running "ConsumerPerformance.scala". Linked https://issues

2020-12-28 Thread GitBox


chia7712 commented on pull request #9790:
URL: https://github.com/apache/kafka/pull/9790#issuecomment-751664539


   @itantiger Thanks for your patch. Could you revise the title?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] itantiger commented on pull request #9790: Some parameters will be overwritten which was configured in consumer.config where running "ConsumerPerformance.scala". Linked https://issue

2020-12-28 Thread GitBox


itantiger commented on pull request #9790:
URL: https://github.com/apache/kafka/pull/9790#issuecomment-751664125


   > > what should I do next :)
   > 
   > please take a look at the report and fix the error. 
https://github.com/apache/kafka/pull/9790/checks?check_run_id=1616222522
   
   I fixed the error, but there were many errors in the second build. It seemed 
that the errors had nothing to do with what I changed. Please help me to have a 
look :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10889) The log cleaner is not working for topic partitions

2020-12-28 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255491#comment-17255491
 ] 

Wenbing Shen edited comment on KAFKA-10889 at 12/28/20, 9:43 AM:
-

Hello,When the user is unfamiliar with the configuration, is there a way to 
solve the problem by design?

[~becket_qin]

[~junrao]


was (Author: wenbing.shen):
[~becket_qin]

[~junrao]

> The log cleaner is not working for topic partitions
> ---
>
> Key: KAFKA-10889
> URL: https://issues.apache.org/jira/browse/KAFKA-10889
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Blocker
> Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, 
> image-2020-12-28-17-17-15-947.png
>
>
> * I have a topic that is reserved for the default of 7 days, but the log 
> exists from October 26th to December 25th today.The log cleaner doesn't seem 
> to be working on it.This seems to be an underlying problem in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10889) The log cleaner is not working for topic partitions

2020-12-28 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255491#comment-17255491
 ] 

Wenbing Shen edited comment on KAFKA-10889 at 12/28/20, 9:43 AM:
-

[~becket_qin]

[~junrao]

Hello,When the user is unfamiliar with the configuration, is there a way to 
solve the problem by design?


was (Author: wenbing.shen):
Hello,When the user is unfamiliar with the configuration, is there a way to 
solve the problem by design?

[~becket_qin]

[~junrao]

> The log cleaner is not working for topic partitions
> ---
>
> Key: KAFKA-10889
> URL: https://issues.apache.org/jira/browse/KAFKA-10889
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Blocker
> Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, 
> image-2020-12-28-17-17-15-947.png
>
>
> * I have a topic that is reserved for the default of 7 days, but the log 
> exists from October 26th to December 25th today.The log cleaner doesn't seem 
> to be working on it.This seems to be an underlying problem in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10889) The log cleaner is not working for topic partitions

2020-12-28 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255491#comment-17255491
 ] 

Wenbing Shen commented on KAFKA-10889:
--

[~becket_qin]

[~junrao]

> The log cleaner is not working for topic partitions
> ---
>
> Key: KAFKA-10889
> URL: https://issues.apache.org/jira/browse/KAFKA-10889
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Blocker
> Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, 
> image-2020-12-28-17-17-15-947.png
>
>
> * I have a topic that is reserved for the default of 7 days, but the log 
> exists from October 26th to December 25th today.The log cleaner doesn't seem 
> to be working on it.This seems to be an underlying problem in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #9792: KAFKA-10870: handle REBALANCE_IN_PROGRESS error in JoinGroup

2020-12-28 Thread GitBox


showuon commented on pull request #9792:
URL: https://github.com/apache/kafka/pull/9792#issuecomment-751647364


   @hachikuji , please help review this PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9792: KAFKA-10870: handle REBALANCE_IN_PROGRESS error in JoinGroup

2020-12-28 Thread GitBox


showuon opened a new pull request #9792:
URL: https://github.com/apache/kafka/pull/9792


   handle REBALANCE_IN_PROGRESS error in JoinGroup to log correct info and 
request a rejoin.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10889) The log cleaner is not working for topic partitions

2020-12-28 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255480#comment-17255480
 ] 

Wenbing Shen edited comment on KAFKA-10889 at 12/28/20, 9:24 AM:
-

The problem appears to be related to this kip:

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message]

Related to these two configurations:

message.timestamp.type=CreateTime

message.timestamp.difference.max.ms=Long.MAXVALUE

The client generated messages which timestamp is 2020-12-26 14:49:32,this 
results in a period before 2021-01-02 14:49:32,all log segments of this 
partition will not be deleted.

 

!image-2020-12-28-17-17-15-947.png!


was (Author: wenbing.shen):
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message]

message.timestamp.type
max.message.time.difference.ms

> The log cleaner is not working for topic partitions
> ---
>
> Key: KAFKA-10889
> URL: https://issues.apache.org/jira/browse/KAFKA-10889
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Blocker
> Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, 
> image-2020-12-28-17-17-15-947.png
>
>
> * I have a topic that is reserved for the default of 7 days, but the log 
> exists from October 26th to December 25th today.The log cleaner doesn't seem 
> to be working on it.This seems to be an underlying problem in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10870) Consumer should handle REBALANCE_IN_PROGRESS from JoinGroup

2020-12-28 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-10870:
-

Assignee: Luke Chen

> Consumer should handle REBALANCE_IN_PROGRESS from JoinGroup
> ---
>
> Key: KAFKA-10870
> URL: https://issues.apache.org/jira/browse/KAFKA-10870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Luke Chen
>Priority: Major
>
> We hit a timeout when persisting group metadata to the __consumer_offsets 
> topic:
> {code}
> [2020-12-18 18:06:08,209] DEBUG [GroupMetadataManager brokerId=1] Metadata 
> from group test_group_id with generation 1 failed when appending to log due 
> to org.apache.kafka.common.errors.TimeoutException 
> (kafka.coordinator.group.GroupMetadataManager)
> [2020-12-18 18:06:08,210] WARN [GroupCoordinator 1]: Failed to persist 
> metadata for group test_group_id: The group is rebalancing, so a rejoin is 
> needed. (kafka.coordinator.group.GroupCoordinator)
> {code}
> This in turn resulted in a REBALANCE_IN_PROGRESS being returned from the 
> JoinGroup:
> {code}
> [2020-12-18 18:06:08,211] INFO Completed 
> request:RequestHeader(apiKey=JOIN_GROUP, apiVersion=7, 
> clientId=consumer-test_group_id-test_group_id-instance-1, correlationId=3) -- 
> {group_id=test_group_id,session_timeout_ms=6,rebalance_timeout_ms=30,member_id=,group_instance_id=test_group_id-instance-1,protocol_type=consumer,protocols=[{name=range,metadata=java.nio.HeapByteBuffer[pos=0
>  lim=26 
> cap=26],_tagged_fields={}}],_tagged_fields={}},response:{throttle_time_ms=0,error_code=27,generation_id=1,protocol_type=consumer,protocol_name=range,leader=test_group_id-instance-2-32e72316-2c3f-40d6-bc34-8ec23d633d34,member_id=,members=[],_tagged_fields={}}
>  from connection 
> 172.31.46.222:9092-172.31.44.169:41310-6;totalTime:5014.825,requestQueueTime:0.193,localTime:11.575,remoteTime:5002.195,throttleTime:0.66,responseQueueTime:0.105,sendTime:0.094,sendIoTime:0.038,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=apache-kafka-java,
>  softwareVersion=5.5.3-ce) (kafka.request.logger)  
> {code}
> The consumer has no logic to handle REBALANCE_IN_PROGRESS from JoinGroup.
> {code}
> [2020-12-18 18:06:08,210] ERROR [Consumer 
> instanceId=test_group_id-instance-1, 
> clientId=consumer-test_group_id-test_group_id-instance-1, 
> groupId=test_group_id] Attempt to join group failed due to unexpected error
> : The group is rebalancing, so a rejoin is needed. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2020-12-18 18:06:08,211] INFO [Consumer instanceId=test_group_id-instance-1, 
> clientId=consumer-test_group_id-test_group_id-instance-1, 
> groupId=test_group_id] Join group failed with org.apache.kafka.common.KafkaE
> xception: Unexpected error in join group response: The group is rebalancing, 
> so a rejoin is needed. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2020-12-18 18:06:08,211] ERROR Error during processing, terminating consumer 
> process:  (org.apache.kafka.tools.VerifiableConsumer)
> org.apache.kafka.common.KafkaException: Unexpected error in join group 
> response: The group is rebalancing, so a rejoin is needed.
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:653)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:574)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10889) The log cleaner is not working for topic partitions

2020-12-28 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255480#comment-17255480
 ] 

Wenbing Shen commented on KAFKA-10889:
--

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message]

message.timestamp.type
max.message.time.difference.ms

> The log cleaner is not working for topic partitions
> ---
>
> Key: KAFKA-10889
> URL: https://issues.apache.org/jira/browse/KAFKA-10889
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Blocker
> Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png
>
>
> * I have a topic that is reserved for the default of 7 days, but the log 
> exists from October 26th to December 25th today.The log cleaner doesn't seem 
> to be working on it.This seems to be an underlying problem in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller

2020-12-28 Thread GeoffreyStark (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GeoffreyStark updated KAFKA-10890:
--
Description: 
In auto.leader.rebalance.enabled=true 

Taking a Broker 1013 (with many leader partitions) offline;

Then, after the leaders of these subdivisions have been elected from other ISRs,

In theory, 1013 should once again become the leader of those partitions

Indeed, if you look at the describe command and the information of the 
corresponding partition in ZK, you can see that the leader has changed back to 
1013.

But if you look at the Controller log and the 1013 log at this point, you'll 
see that there are some warning messages, and after a while,

the producer reports an error"

 The server is not the leader for that topic-partition..Going to request 
metadata update now

"

 

It looks like the controller failed to send a LeaderAndIsr request to a 1013 
node after it restarted, Then the 1013 node‘s log has been

 
{code:java}
"Ignoring LeaderAndIsr request from controller 1017 with the correlation id 33 
epoch 6 fro partition sp since its associated leader 5 is not who the current 
leader epoch 5 (state. Change. Logger). "
{code}
 

 

After a while, The producer reports an exception.
{code:java}
"The server is not The leader for that topically -- partition.. Going to 
request metadata update now"
{code}
 

  was:
In auto.leader.rebalance.enabled=true 

Taking a Broker 1013 (with many leader partitions) offline;

Then, after the leaders of these subdivisions have been elected from other ISRs,

In theory, 1013 should once again become the leader of those partitions

Indeed, if you look at the describe command and the information of the 
corresponding partition in ZK, you can see that the leader has changed back to 
1013.

But if you look at the Controller log and the 1013 log at this point, you'll 
see that there are some warning messages, and after a while,

the producer reports an error"

 The server is not the leader for that topic-partition..Going to request 
metadata update now

"


>  Broker just stated Ignoring LeaderAndIsr request from controller
> -
>
> Key: KAFKA-10890
> URL: https://issues.apache.org/jira/browse/KAFKA-10890
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
> Environment: kfk 2.0
> 74 brokers
> 3 replica-factors
>Reporter: GeoffreyStark
>Priority: Major
> Attachments: image-2020-12-28-16-59-03-492.png, 
> jstack-1013broker-1228-1312, kafka元数据混乱.docx
>
>
> In auto.leader.rebalance.enabled=true 
> Taking a Broker 1013 (with many leader partitions) offline;
> Then, after the leaders of these subdivisions have been elected from other 
> ISRs,
> In theory, 1013 should once again become the leader of those partitions
> Indeed, if you look at the describe command and the information of the 
> corresponding partition in ZK, you can see that the leader has changed back 
> to 1013.
> But if you look at the Controller log and the 1013 log at this point, you'll 
> see that there are some warning messages, and after a while,
> the producer reports an error"
>  The server is not the leader for that topic-partition..Going to request 
> metadata update now
> "
>  
> It looks like the controller failed to send a LeaderAndIsr request to a 1013 
> node after it restarted, Then the 1013 node‘s log has been
>  
> {code:java}
> "Ignoring LeaderAndIsr request from controller 1017 with the correlation id 
> 33 epoch 6 fro partition sp since its associated leader 5 is not who the 
> current leader epoch 5 (state. Change. Logger). "
> {code}
>  
>  
> After a while, The producer reports an exception.
> {code:java}
> "The server is not The leader for that topically -- partition.. Going to 
> request metadata update now"
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller

2020-12-28 Thread GeoffreyStark (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GeoffreyStark updated KAFKA-10890:
--
Attachment: image-2020-12-28-16-59-03-492.png

>  Broker just stated Ignoring LeaderAndIsr request from controller
> -
>
> Key: KAFKA-10890
> URL: https://issues.apache.org/jira/browse/KAFKA-10890
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
> Environment: kfk 2.0
> 74 brokers
> 3 replica-factors
>Reporter: GeoffreyStark
>Priority: Major
> Attachments: image-2020-12-28-16-59-03-492.png, 
> jstack-1013broker-1228-1312, kafka元数据混乱.docx
>
>
> In auto.leader.rebalance.enabled=true 
> Taking a Broker 1013 (with many leader partitions) offline;
> Then, after the leaders of these subdivisions have been elected from other 
> ISRs,
> In theory, 1013 should once again become the leader of those partitions
> Indeed, if you look at the describe command and the information of the 
> corresponding partition in ZK, you can see that the leader has changed back 
> to 1013.
> But if you look at the Controller log and the 1013 log at this point, you'll 
> see that there are some warning messages, and after a while,
> the producer reports an error"
>  The server is not the leader for that topic-partition..Going to request 
> metadata update now
> "



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller

2020-12-28 Thread GeoffreyStark (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255473#comment-17255473
 ] 

GeoffreyStark commented on KAFKA-10890:
---

controller log has been follow text

 

!image-2020-12-28-16-59-03-492.png!

>  Broker just stated Ignoring LeaderAndIsr request from controller
> -
>
> Key: KAFKA-10890
> URL: https://issues.apache.org/jira/browse/KAFKA-10890
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
> Environment: kfk 2.0
> 74 brokers
> 3 replica-factors
>Reporter: GeoffreyStark
>Priority: Major
> Attachments: image-2020-12-28-16-59-03-492.png, 
> jstack-1013broker-1228-1312, kafka元数据混乱.docx
>
>
> In auto.leader.rebalance.enabled=true 
> Taking a Broker 1013 (with many leader partitions) offline;
> Then, after the leaders of these subdivisions have been elected from other 
> ISRs,
> In theory, 1013 should once again become the leader of those partitions
> Indeed, if you look at the describe command and the information of the 
> corresponding partition in ZK, you can see that the leader has changed back 
> to 1013.
> But if you look at the Controller log and the 1013 log at this point, you'll 
> see that there are some warning messages, and after a while,
> the producer reports an error"
>  The server is not the leader for that topic-partition..Going to request 
> metadata update now
> "



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller

2020-12-28 Thread GeoffreyStark (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255472#comment-17255472
 ] 

GeoffreyStark commented on KAFKA-10890:
---

It looks like the controller failed to send a LeaderAndIsr request to a 1013 
node after it restarted, Then the 1013 node‘s log has been

 
{code:java}
"Ignoring LeaderAndIsr request from controller 1017 with the correlation id 33 
epoch 6 fro partition sp since its associated leader 5 is not who the current 
leader epoch 5 (state. Change. Logger). "
{code}
 

 

After a while, The producer reports an exception.
{code:java}
"The server is not The leader for that topically -- partition.. Going to 
request metadata update now"
{code}
 

>  Broker just stated Ignoring LeaderAndIsr request from controller
> -
>
> Key: KAFKA-10890
> URL: https://issues.apache.org/jira/browse/KAFKA-10890
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
> Environment: kfk 2.0
> 74 brokers
> 3 replica-factors
>Reporter: GeoffreyStark
>Priority: Major
> Attachments: jstack-1013broker-1228-1312, kafka元数据混乱.docx
>
>
> In auto.leader.rebalance.enabled=true 
> Taking a Broker 1013 (with many leader partitions) offline;
> Then, after the leaders of these subdivisions have been elected from other 
> ISRs,
> In theory, 1013 should once again become the leader of those partitions
> Indeed, if you look at the describe command and the information of the 
> corresponding partition in ZK, you can see that the leader has changed back 
> to 1013.
> But if you look at the Controller log and the 1013 log at this point, you'll 
> see that there are some warning messages, and after a while,
> the producer reports an error"
>  The server is not the leader for that topic-partition..Going to request 
> metadata update now
> "



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller

2020-12-28 Thread GeoffreyStark (Jira)
GeoffreyStark created KAFKA-10890:
-

 Summary:  Broker just stated Ignoring LeaderAndIsr request from 
controller
 Key: KAFKA-10890
 URL: https://issues.apache.org/jira/browse/KAFKA-10890
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.0.0
 Environment: kfk 2.0
74 brokers
3 replica-factors
Reporter: GeoffreyStark
 Attachments: jstack-1013broker-1228-1312, kafka元数据混乱.docx

In auto.leader.rebalance.enabled=true 

Taking a Broker 1013 (with many leader partitions) offline;

Then, after the leaders of these subdivisions have been elected from other ISRs,

In theory, 1013 should once again become the leader of those partitions

Indeed, if you look at the describe command and the information of the 
corresponding partition in ZK, you can see that the leader has changed back to 
1013.

But if you look at the Controller log and the 1013 log at this point, you'll 
see that there are some warning messages, and after a while,

the producer reports an error"

 The server is not the leader for that topic-partition..Going to request 
metadata update now

"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)