[GitHub] [kafka] chia7712 commented on a change in pull request #9778: KAFKA-10874 Fix flaky ClientQuotasRequestTest.testAlterIpQuotasRequest
chia7712 commented on a change in pull request #9778: URL: https://github.com/apache/kafka/pull/9778#discussion_r549598781 ## File path: core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala ## @@ -212,7 +214,9 @@ class ClientQuotasRequestTest extends BaseRequestTest { InetAddress.getByName(unknownHost) else InetAddress.getByName(entityName) -assertEquals(expectedMatches(entity), servers.head.socketServer.connectionQuotas.connectionRateForIp(entityIp), 0.01) +TestUtils.waitUntilTrue( + () => expectedMatches(entity) - servers.head.socketServer.connectionQuotas.connectionRateForIp(entityIp) < 0.01 + ,"Broker didn't update prop from Zookeeper") Review comment: Could we rename "prop" to "quotas"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] itantiger commented on a change in pull request #9790: Some parameters will be overwritten which was configured in consumer.config.
itantiger commented on a change in pull request #9790: URL: https://github.com/apache/kafka/pull/9790#discussion_r549591650 ## File path: core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala ## @@ -96,6 +97,28 @@ class ConsumerPerformanceTest { assertEquals("test", config.topic) assertEquals(10, config.numMessages) } + + @Test + def testConfigWithRecognizedOptionOverride(): Unit = { +val propsFile = TestUtils.tempFile() +val propsStream = Files.newOutputStream(propsFile.toPath) +propsStream.write("group.id=test_group_id\n".getBytes()) +propsStream.write("client.id=test_client_id".getBytes()) +propsStream.close() Review comment: Okey, I finished the local test and submitted the code :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9790: Some parameters will be overwritten which was configured in consumer.config.
chia7712 commented on a change in pull request #9790: URL: https://github.com/apache/kafka/pull/9790#discussion_r549589413 ## File path: core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala ## @@ -96,6 +97,28 @@ class ConsumerPerformanceTest { assertEquals("test", config.topic) assertEquals(10, config.numMessages) } + + @Test + def testConfigWithRecognizedOptionOverride(): Unit = { +val propsFile = TestUtils.tempFile() +val propsStream = Files.newOutputStream(propsFile.toPath) +propsStream.write("group.id=test_group_id\n".getBytes()) +propsStream.write("client.id=test_client_id".getBytes()) +propsStream.close() Review comment: Or could we add ```group.id``` to command-line arguments to make sure it (from file) is NOT override by command-line args? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9781: MINOR: Use top-level error in `UpdateFeaturesRequest.getErrorResponse`
chia7712 commented on a change in pull request #9781: URL: https://github.com/apache/kafka/pull/9781#discussion_r549580987 ## File path: clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java ## @@ -45,16 +44,18 @@ public UpdateFeaturesResponse(UpdateFeaturesResponseData data) { this.data = data; } -public Map errors() { -return data.results().valuesSet().stream().collect( -Collectors.toMap( -result -> result.feature(), -result -> new ApiError(Errors.forCode(result.errorCode()), result.errorMessage(; +public ApiError topLevelError() { +return new ApiError(Errors.forCode(data.errorCode()), data.errorMessage()); } @Override public Map errorCounts() { -return apiErrorCounts(errors()); +Map errorCounts = new HashMap<>(); +updateErrorCounts(errorCounts, Errors.forCode(data.errorCode())); +for (UpdatableFeatureResult result : data.results().valuesSet()) { Review comment: Is ```valuesSet``` necessary? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] itantiger commented on a change in pull request #9790: Some parameters will be overwritten which was configured in consumer.config.
itantiger commented on a change in pull request #9790: URL: https://github.com/apache/kafka/pull/9790#discussion_r549583876 ## File path: core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala ## @@ -96,6 +97,28 @@ class ConsumerPerformanceTest { assertEquals("test", config.topic) assertEquals(10, config.numMessages) } + + @Test + def testConfigWithRecognizedOptionOverride(): Unit = { +val propsFile = TestUtils.tempFile() +val propsStream = Files.newOutputStream(propsFile.toPath) +propsStream.write("group.id=test_group_id\n".getBytes()) +propsStream.write("client.id=test_client_id".getBytes()) +propsStream.close() Review comment: So, if config file has the param `topic=test`, It will not take effect. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] itantiger commented on a change in pull request #9790: Some parameters will be overwritten which was configured in consumer.config.
itantiger commented on a change in pull request #9790: URL: https://github.com/apache/kafka/pull/9790#discussion_r549583198 ## File path: core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala ## @@ -96,6 +97,28 @@ class ConsumerPerformanceTest { assertEquals("test", config.topic) assertEquals(10, config.numMessages) } + + @Test + def testConfigWithRecognizedOptionOverride(): Unit = { +val propsFile = TestUtils.tempFile() +val propsStream = Files.newOutputStream(propsFile.toPath) +propsStream.write("group.id=test_group_id\n".getBytes()) +propsStream.write("client.id=test_client_id".getBytes()) +propsStream.close() Review comment: `--topic` is a `REQUIRED` argument in command-line, I thank Its value is determined by the command line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9790: Some parameters will be overwritten which was configured in consumer.config.
chia7712 commented on a change in pull request #9790: URL: https://github.com/apache/kafka/pull/9790#discussion_r549578153 ## File path: core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala ## @@ -96,6 +97,28 @@ class ConsumerPerformanceTest { assertEquals("test", config.topic) assertEquals(10, config.numMessages) } + + @Test + def testConfigWithRecognizedOptionOverride(): Unit = { +val propsFile = TestUtils.tempFile() +val propsStream = Files.newOutputStream(propsFile.toPath) +propsStream.write("group.id=test_group_id\n".getBytes()) +propsStream.write("client.id=test_client_id".getBytes()) +propsStream.close() Review comment: Could you test ```--topic``` to make sure it is not overrided by command-line arguments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon commented on pull request #9789: URL: https://github.com/apache/kafka/pull/9789#issuecomment-751957476 > @showuon Thank you very much for your suggestions and comments,In order to meet the requirement of submitting PR, do I need to submit a PR again?I'm going to create a new jIRA ID for this PR and add test cases. No, just update in this PR directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest
chia7712 commented on a change in pull request #9761: URL: https://github.com/apache/kafka/pull/9761#discussion_r549576728 ## File path: clients/src/test/java/org/apache/kafka/common/utils/ByteBufferInputStreamTest.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import static org.junit.Assert.assertEquals; Review comment: We have a pending PR offering a consistent import order (see https://github.com/apache/kafka/pull/8404#discussion_r535791013). Could you please follow the rules (although it is not merged)? 1. kafka, org.apache.kafka 1. com, net, org 1. java, javax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9520: MINOR: replace test "expected" parameter by assertThrows
chia7712 commented on pull request #9520: URL: https://github.com/apache/kafka/pull/9520#issuecomment-751954761 @ijuma Thanks for your great reviewing. I have addressed most review comment in latest commit. > It would be good to clearly mention the cases where the translation wasn't mechanism as they are the most likely sources of issues. Pardon me. I failed to catch your point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9520: MINOR: replace test "expected" parameter by assertThrows
chia7712 commented on a change in pull request #9520: URL: https://github.com/apache/kafka/pull/9520#discussion_r549569836 ## File path: connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java ## @@ -293,16 +295,13 @@ public void testEmptyStruct() { new Struct(emptyStructSchema); } -@Test(expected = SchemaBuilderException.class) +@Test public void testDuplicateFields() { -final Schema schema = SchemaBuilder.struct() -.name("testing") -.field("id", SchemaBuilder.string().doc("").build()) -.field("id", SchemaBuilder.string().doc("").build()) -.build(); -final Struct struct = new Struct(schema) -.put("id", "testing"); -struct.validate(); +assertThrows(SchemaBuilderException.class, () -> SchemaBuilder.struct() +.name("testing") +.field("id", SchemaBuilder.string().doc("").build()) +.field("id", SchemaBuilder.string().doc("").build()) +.build()); Review comment: ```StructTest``` has test cases for ```validate``` method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9520: MINOR: replace test "expected" parameter by assertThrows
chia7712 commented on a change in pull request #9520: URL: https://github.com/apache/kafka/pull/9520#discussion_r549568729 ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ## @@ -504,20 +510,21 @@ public void writePastLimit() { } } -@Test(expected = IllegalArgumentException.class) +@Test public void testAppendAtInvalidOffset() { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); long logAppendTime = System.currentTimeMillis(); -MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, +MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, Review comment: magic 1 does not support ```ZStandard``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9520: MINOR: replace test "expected" parameter by assertThrows
chia7712 commented on a change in pull request #9520: URL: https://github.com/apache/kafka/pull/9520#discussion_r549567477 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ## @@ -444,34 +444,30 @@ public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlread assertEquals(DEFAULT_RETRY_BACKOFF_MS, handler.retryBackoffMs()); } -@Test(expected = IllegalStateException.class) +@Test public void testMaybeAddPartitionToTransactionBeforeInitTransactions() { -transactionManager.failIfNotReadyForSend(); -transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); +assertThrows(IllegalStateException.class, () -> transactionManager.failIfNotReadyForSend()); } -@Test(expected = IllegalStateException.class) +@Test public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() { doInitTransactions(); -transactionManager.failIfNotReadyForSend(); -transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); +assertThrows(IllegalStateException.class, () -> transactionManager.failIfNotReadyForSend()); } -@Test(expected = KafkaException.class) +@Test public void testMaybeAddPartitionToTransactionAfterAbortableError() { doInitTransactions(); transactionManager.beginTransaction(); transactionManager.transitionToAbortableError(new KafkaException()); -transactionManager.failIfNotReadyForSend(); -transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); +assertThrows(KafkaException.class, () -> transactionManager.failIfNotReadyForSend()); } -@Test(expected = KafkaException.class) +@Test public void testMaybeAddPartitionToTransactionAfterFatalError() { doInitTransactions(); transactionManager.transitionToFatalError(new KafkaException()); -transactionManager.failIfNotReadyForSend(); -transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); +assertThrows(KafkaException.class, () -> transactionManager.failIfNotReadyForSend()); Review comment: https://github.com/apache/kafka/commit/717c55be971df862c55f55d245b9997f1d6f998c moved the check of state out of ```maybeAddPartitionToTransaction``` and it added ```failIfNotReadyForSend``` to all test cases to make them throw exception. I will update the test cases name to avoid confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on pull request #9789: Force the validation control request,isolate other data requests.Prot…
wenbingshen commented on pull request #9789: URL: https://github.com/apache/kafka/pull/9789#issuecomment-751941992 @showuon Thank you very much for your suggestions and comments,In order to meet the requirement of submitting PR, do I need to submit a PR again?I'm going to create a new jIRA ID for this PR and add test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon commented on a change in pull request #9789: URL: https://github.com/apache/kafka/pull/9789#discussion_r549561892 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int, } } + protected def isControlRequest(header: RequestHeader): Boolean = { +if (isControlPlane) { + header.apiKey() match { +case ApiKeys.LEADER_AND_ISR => true +case ApiKeys.STOP_REPLICA => true +case ApiKeys.UPDATE_METADATA => true +case ApiKeys.CONTROLLED_SHUTDOWN => true +case _ => false + } +}else { Review comment: Not sure if this is better=: ```scala header.apiKey() match { case ApiKeys.LEADER_AND_ISR | ApiKeys.STOP_REPLICA | ... | ... => true case _ => false } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon commented on a change in pull request #9789: URL: https://github.com/apache/kafka/pull/9789#discussion_r549561892 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int, } } + protected def isControlRequest(header: RequestHeader): Boolean = { +if (isControlPlane) { + header.apiKey() match { +case ApiKeys.LEADER_AND_ISR => true +case ApiKeys.STOP_REPLICA => true +case ApiKeys.UPDATE_METADATA => true +case ApiKeys.CONTROLLED_SHUTDOWN => true +case _ => false + } +}else { Review comment: Not sure if this is better=: ```scala header.apiKey() match { case ApiKeys.LEADER_AND_ISR | ApiKeys.STOP_REPLICA | ... | ... if (isControlPlane) => true case _ => false } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon commented on a change in pull request #9789: URL: https://github.com/apache/kafka/pull/9789#discussion_r549561892 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int, } } + protected def isControlRequest(header: RequestHeader): Boolean = { +if (isControlPlane) { + header.apiKey() match { +case ApiKeys.LEADER_AND_ISR => true +case ApiKeys.STOP_REPLICA => true +case ApiKeys.UPDATE_METADATA => true +case ApiKeys.CONTROLLED_SHUTDOWN => true +case _ => false + } +}else { Review comment: Not sure if this is better (so no `else` needed): ```scala protected def isControlRequest(header: RequestHeader): Boolean = { header.apiKey() match { case ApiKeys.LEADER_AND_ISR | ApiKeys.STOP_REPLICA | ... | ... if (isControlPlane) => true case _ => false } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon edited a comment on pull request #9789: URL: https://github.com/apache/kafka/pull/9789#issuecomment-751936350 @wenbingshen , thanks for the PR. Some comments left, but most importantly, we don't know which JIRA ticket you're fixing. Please check the [contributing code change](https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes) page. Thank you. > The PR title should usually be of the form KAFKA-: Title, where KAFKA- is the relevant JIRA id and Title may be the JIRA's title or a more specific title describing the PR itself. For trivial cases where a JIRA is not required (see JIRA section for more details) MINOR: or HOTFIX: can be used as the PR title prefix. Also, please add tests. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon commented on a change in pull request #9789: URL: https://github.com/apache/kafka/pull/9789#discussion_r549561892 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int, } } + protected def isControlRequest(header: RequestHeader): Boolean = { +if (isControlPlane) { + header.apiKey() match { +case ApiKeys.LEADER_AND_ISR => true +case ApiKeys.STOP_REPLICA => true +case ApiKeys.UPDATE_METADATA => true +case ApiKeys.CONTROLLED_SHUTDOWN => true +case _ => false + } +}else { Review comment: Not sure if this is better: ```scala header.apiKey() match { case ApiKeys.LEADER_AND_ISR | ApiKeys.STOP_REPLICA | ... | ... if (isControlPlane) => true case _ => false } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon commented on a change in pull request #9789: URL: https://github.com/apache/kafka/pull/9789#discussion_r549558749 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int, } } + protected def isControlRequest(header: RequestHeader): Boolean = { Review comment: could we use `private` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon commented on a change in pull request #9789: URL: https://github.com/apache/kafka/pull/9789#discussion_r549558597 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int, } } + protected def isControlRequest(header: RequestHeader): Boolean = { +if (isControlPlane) { + header.apiKey() match { +case ApiKeys.LEADER_AND_ISR => true +case ApiKeys.STOP_REPLICA => true +case ApiKeys.UPDATE_METADATA => true +case ApiKeys.CONTROLLED_SHUTDOWN => true +case _ => false + } +}else { Review comment: formatting is not correct. `} else {` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon commented on a change in pull request #9789: URL: https://github.com/apache/kafka/pull/9789#discussion_r549558489 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int, } } + protected def isControlRequest(header: RequestHeader): Boolean = { +if (isControlPlane) { + header.apiKey() match { +case ApiKeys.LEADER_AND_ISR => true +case ApiKeys.STOP_REPLICA => true +case ApiKeys.UPDATE_METADATA => true +case ApiKeys.CONTROLLED_SHUTDOWN => true +case _ => false + } +}else { + true +} + } + private def processCompletedReceives(): Unit = { selector.completedReceives.forEach { receive => try { openOrClosingChannel(receive.source) match { case Some(channel) => val header = parseRequestHeader(receive.payload) -if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, - () => time.nanoseconds())) - trace(s"Begin re-authentication: $channel") -else { - val nowNanos = time.nanoseconds() - if (channel.serverAuthenticationSessionExpired(nowNanos)) { -// be sure to decrease connection count and drop any in-flight responses -debug(s"Disconnecting expired channel: $channel : $header") -close(channel.id) -expiredConnectionsKilledCount.record(null, 1, 0) - } else { -val connectionId = receive.source -val context = new RequestContext(header, connectionId, channel.socketAddress, - channel.principal, listenerName, securityProtocol, - channel.channelMetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde) - -var req = new RequestChannel.Request(processor = id, context = context, - startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None) - -if (req.header.apiKey == ApiKeys.ENVELOPE) { - // Override the request context with the forwarded request context. - // The envelope's context will be preserved in the forwarded context - - req = parseForwardedPrincipal(req, channel.principalSerde.asScala) match { -case Some(forwardedPrincipal) => - buildForwardedRequestContext(req, forwardedPrincipal) - -case None => - val envelopeResponse = new EnvelopeResponse(Errors.PRINCIPAL_DESERIALIZATION_FAILURE) - sendEnvelopeResponse(req, envelopeResponse) - null +if (!isControlRequest(header)) { + info(s"Current plane is control-plan, disconnecting non controller channel: $channel : $header") + close(channel.id) +}else{ Review comment: formatting is not correct. `} else {` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon commented on a change in pull request #9789: URL: https://github.com/apache/kafka/pull/9789#discussion_r549557939 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -988,60 +990,79 @@ private[kafka] class Processor(val id: Int, } } + protected def isControlRequest(header: RequestHeader): Boolean = { +if (isControlPlane) { + header.apiKey() match { +case ApiKeys.LEADER_AND_ISR => true +case ApiKeys.STOP_REPLICA => true +case ApiKeys.UPDATE_METADATA => true +case ApiKeys.CONTROLLED_SHUTDOWN => true +case _ => false + } +}else { + true +} + } + private def processCompletedReceives(): Unit = { selector.completedReceives.forEach { receive => try { openOrClosingChannel(receive.source) match { case Some(channel) => val header = parseRequestHeader(receive.payload) -if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, - () => time.nanoseconds())) - trace(s"Begin re-authentication: $channel") -else { - val nowNanos = time.nanoseconds() - if (channel.serverAuthenticationSessionExpired(nowNanos)) { -// be sure to decrease connection count and drop any in-flight responses -debug(s"Disconnecting expired channel: $channel : $header") -close(channel.id) -expiredConnectionsKilledCount.record(null, 1, 0) - } else { -val connectionId = receive.source -val context = new RequestContext(header, connectionId, channel.socketAddress, - channel.principal, listenerName, securityProtocol, - channel.channelMetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde) - -var req = new RequestChannel.Request(processor = id, context = context, - startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None) - -if (req.header.apiKey == ApiKeys.ENVELOPE) { - // Override the request context with the forwarded request context. - // The envelope's context will be preserved in the forwarded context - - req = parseForwardedPrincipal(req, channel.principalSerde.asScala) match { -case Some(forwardedPrincipal) => - buildForwardedRequestContext(req, forwardedPrincipal) - -case None => - val envelopeResponse = new EnvelopeResponse(Errors.PRINCIPAL_DESERIALIZATION_FAILURE) - sendEnvelopeResponse(req, envelopeResponse) - null +if (!isControlRequest(header)) { + info(s"Current plane is control-plan, disconnecting non controller channel: $channel : $header") Review comment: typo: `control-plane` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9789: Force the validation control request,isolate other data requests.Prot…
showuon commented on pull request #9789: URL: https://github.com/apache/kafka/pull/9789#issuecomment-751936350 @wenbingshen , thanks for the PR. Some comments left, but most importantly, we don't know which JIRA ticket you're fixing. Please check the [contributing code change](https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes) page. Thank you. > The PR title should usually be of the form KAFKA-: Title, where KAFKA- is the relevant JIRA id and Title may be the JIRA's title or a more specific title describing the PR itself. For trivial cases where a JIRA is not required (see JIRA section for more details) MINOR: or HOTFIX: can be used as the PR title prefix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r549555914 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedInternal.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Branched; +import org.apache.kafka.streams.kstream.KStream; + +import java.util.function.Consumer; +import java.util.function.Function; + +class BranchedInternal extends Branched { +BranchedInternal(final Branched branched) { +super(branched); +} + +BranchedInternal() { +super(null, null, null); +} + +static BranchedInternal empty() { +return new BranchedInternal<>(); +} + +String getName() { Review comment: It's a naming convention in the whole Kafka code base, to omit the `get` prefix for all getter methods, ie, this should be `name()`. (Similar below for the other getters.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app
mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r549546234 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval, */ Map appConfigsWithPrefix(final String prefix); +/** + * Returns current cached wall-clock system timestamp in milliseconds. Review comment: nit: `Return` without `s` -- we use imperative to write JavaDocs. I would remove `cached` and add a dedicated second sentence about it. Also `wall-clock time` and `system time` are synonymous and thus `wall-clock system time` sounds a little odd. What about: ``` Return the current system timestamp (also called wall-clock time) in milliseconds. Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. Thus, it may return a different value compared to `System.currentTimeMillis()`. For a global processor, Kafka Streams does not cache system time and thus calling this method will return the same value as `System.currentTimeMillis()`. @return the current system timestamp in milliseconds ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java ## @@ -166,4 +166,14 @@ public long timestamp() { public Map appConfigsWithPrefix(final String prefix) { return delegate.appConfigsWithPrefix(prefix); } + +@Override +public long currentSystemTimeMs() { +throw new UnsupportedOperationException("this method is not supported in ForwardingDisabledProcessor context"); Review comment: Why do we through here? It seems to be safe to get the time from `delegate` object? We only disable `forward` but nothing else. (same below) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java ## @@ -115,6 +116,16 @@ public void commit() { //no-op } +@Override +public long currentSystemTimeMs() { +return Time.SYSTEM.milliseconds(); +} + +@Override +public long currentStreamTimeMs() { +throw new UnsupportedOperationException("this method is not supported in global processor context."); Review comment: This makes sense, but we might want to document this somewhere else, ie, in the corresponding docs. To be fair, not sure atm, if we have much content about it and/or where it add it? We should for sure document it in the JavaDocs, ie, the `ProcessorContext` interface. (cf my comment above) About the error message: ``` throw new UnsupportedOperationException("There is no concept of stream-time for a global processor."); ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -289,4 +289,17 @@ Cancellable schedule(final Duration interval, */ Map appConfigsWithPrefix(final String prefix); +/** + * Returns current cached wall-clock system timestamp in milliseconds. + * + * @return the current cached wall-clock system timestamp in milliseconds + */ +long currentSystemTimeMs(); + +/** + * Returns the maximum timestamp of any record yet processed by the task. Review comment: What about: ``` Return the current stream-time in milliseconds. Stream-time is the maximum observed {@link TimestampExtractor record timestamp} so far (including the currently processed record), i.e., it can be considered a high-watermark. Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore(...)} and {@link StreamsBuilder#addGlobalStore(...)}, because there is no concept of stream-time for this case. Calling this method in a global processor with result in an {@link UnsupportedOperationException}. @return the current stream-time in milliseconds ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java ## @@ -160,4 +160,14 @@ public long timestamp() { public Map appConfigsWithPrefix(final String prefix) { return delegate.appConfigsWithPrefix(prefix); } + +@Override +public long currentSystemTimeMs() { +throw new UnsupportedOperationException("this method is not supported in StoreToProcessorContextAdapter"); Review comment: We should align the error message to the ones from above: `"StateStores can't access system time."` (same below). ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -,13 +,21 @@ RecordCollector recordCollector() {
[GitHub] [kafka] mjsax commented on pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on pull request #9107: URL: https://github.com/apache/kafka/pull/9107#issuecomment-751926120 @inponomarev -- Can you also update the docs for Kafka Streams and the 2.8 upgrade guide in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9553: KAFKA-10427: Fetch snapshot
hachikuji merged pull request #9553: URL: https://github.com/apache/kafka/pull/9553 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9553: KAFKA-10427: Fetch snapshot
hachikuji commented on pull request #9553: URL: https://github.com/apache/kafka/pull/9553#issuecomment-751924971 I manually reran tests since it has been a week since the last update. Here is a link to the results since they have not been automatically linked: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9553/14/. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie commented on a change in pull request #9778: KAFKA-10874 Fix flaky ClientQuotasRequestTest.testAlterIpQuotasRequest
g1geordie commented on a change in pull request #9778: URL: https://github.com/apache/kafka/pull/9778#discussion_r549545680 ## File path: core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala ## @@ -212,7 +214,10 @@ class ClientQuotasRequestTest extends BaseRequestTest { InetAddress.getByName(unknownHost) else InetAddress.getByName(entityName) -assertEquals(expectedMatches(entity), servers.head.socketServer.connectionQuotas.connectionRateForIp(entityIp), 0.01) +TestUtils.retry(1L) { Review comment: Thank you This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on pull request #9107: URL: https://github.com/apache/kafka/pull/9107#issuecomment-751922810 About the original comment: https://github.com/apache/kafka/pull/9107#issuecomment-666749809 I am fine with those changes. About https://github.com/apache/kafka/pull/9107#issuecomment-751261181 -- that is a good point. Thanks for explaining. I guess it's a "philosophical" question if we want to allow this pattern though, or if we want to require that either `defaultBranch()` or `noDefaultBranch()` is called? -- I did consider calling `branch()` like a builder pattern, and the final `[noD|d]efaultBranch` call is basically `build()`? Curious to hear what @vvcephei thinks about it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller
[ https://issues.apache.org/jira/browse/KAFKA-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GeoffreyStark updated KAFKA-10890: -- Description: In auto.leader.rebalance.enabled=true Taking a Broker 1013 (with many leader partitions) offline; Then, after the leaders of these subdivisions have been elected from other ISRs, In theory, 1013 should once again become the leader of those partitions Indeed, if you look at the describe command and the information of the corresponding partition in ZK, you can see that the leader has changed back to 1013. But if you look at the Controller log and the 1013 log at this point, you'll see that there are some warning messages, It looks like the controller failed to send a LeaderAndIsr request to a 1013 node after it restarted, Then the 1013 node‘s log has been {code:java} "Ignoring LeaderAndIsr request from controller 1017 with the correlation id 33 epoch 6 fro partition sp since its associated leader 5 is not who the current leader epoch 5 (state. Change. Logger). " {code} After a while, the producer reports an error. {code:java} "The server is not The leader for that topically -- partition.. Going to request metadata update now" {code} was: In auto.leader.rebalance.enabled=true Taking a Broker 1013 (with many leader partitions) offline; Then, after the leaders of these subdivisions have been elected from other ISRs, In theory, 1013 should once again become the leader of those partitions Indeed, if you look at the describe command and the information of the corresponding partition in ZK, you can see that the leader has changed back to 1013. But if you look at the Controller log and the 1013 log at this point, you'll see that there are some warning messages, and after a while, the producer reports an error" The server is not the leader for that topic-partition..Going to request metadata update now " It looks like the controller failed to send a LeaderAndIsr request to a 1013 node after it restarted, Then the 1013 node‘s log has been {code:java} "Ignoring LeaderAndIsr request from controller 1017 with the correlation id 33 epoch 6 fro partition sp since its associated leader 5 is not who the current leader epoch 5 (state. Change. Logger). " {code} After a while, The producer reports an exception. {code:java} "The server is not The leader for that topically -- partition.. Going to request metadata update now" {code} > Broker just stated Ignoring LeaderAndIsr request from controller > - > > Key: KAFKA-10890 > URL: https://issues.apache.org/jira/browse/KAFKA-10890 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.0 > Environment: kfk 2.0 > 74 brokers > 3 replica-factors >Reporter: GeoffreyStark >Priority: Major > Attachments: image-2020-12-28-16-59-03-492.png, > jstack-1013broker-1228-1312, kafka元数据混乱.docx > > > In auto.leader.rebalance.enabled=true > Taking a Broker 1013 (with many leader partitions) offline; > Then, after the leaders of these subdivisions have been elected from other > ISRs, > In theory, 1013 should once again become the leader of those partitions > Indeed, if you look at the describe command and the information of the > corresponding partition in ZK, you can see that the leader has changed back > to 1013. > But if you look at the Controller log and the 1013 log at this point, you'll > see that there are some warning messages, > > > It looks like the controller failed to send a LeaderAndIsr request to a 1013 > node after it restarted, Then the 1013 node‘s log has been > > {code:java} > "Ignoring LeaderAndIsr request from controller 1017 with the correlation id > 33 epoch 6 fro partition sp since its associated leader 5 is not who the > current leader epoch 5 (state. Change. Logger). " > {code} > > > After a while, > the producer reports an error. > {code:java} > "The server is not The leader for that topically -- partition.. Going to > request metadata update now" > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r549543943 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * The {@code Branched} class is used to define the optional parameters when building branches with + * {@link BranchedKStream}. + * + * @param type of record key + * @param type of record value + */ +public class Branched implements NamedOperation> { + +protected final String name; +protected final Function, +? extends KStream> chainFunction; +protected final Consumer> chainConsumer; + +protected Branched(final String name, + final Function, ? extends KStream> chainFunction, + final Consumer> chainConsumer) { +this.name = name; +this.chainFunction = chainFunction; +this.chainConsumer = chainConsumer; +} + +/** + * Create an instance of {@link Branched} from an existing instance. + * + * @param branched the instance of {@link Branched} to copy + */ +protected Branched(final Branched branched) { +this(branched.name, branched.chainFunction, branched.chainConsumer); +} + +/** + * Configure the instance of {@link Branched} with a branch name postfix. + * + * @param name the branch name postfix to be used. If {@code null} a default branch name postfix will be generated (see + * {@link BranchedKStream} description for details) + * @return {@code this} + */ +@Override +public Branched withName(final String name) { +return new Branched<>(name, chainFunction, chainConsumer); +} + +/** + * Create an instance of {@link Branched} with provided branch name postfix. + * + * @param name the branch name postfix to be used. If {@code null}, a default branch name postfix will be generated + * (see {@link BranchedKStream} description for details) + * @param key type + * @param value type + * @return a new instance of {@link Branched} + */ +public static Branched as(final String name) { +return new Branched<>(name, null, null); +} + +/** + * Create an instance of {@link Branched} with provided chain function. + * + * @param chain A function that will be applied to the branch. If {@code null}, the identity + * {@code kStream -> kStream} function will be supposed. If this function returns + * {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned + * by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see + * {@link BranchedKStream} description for details). + * @paramkey type + * @paramvalue type + * @return a new instance of {@link Branched} + */ +public static Branched withFunction( +final Function, +? extends KStream> chain) { +return new Branched<>(null, chain, null); +} + +/** + * Create an instance of {@link Branched} with provided chain consumer. + * + * @param chain A consumer to which the branch will be sent. If a non-null branch is provided here, + * the respective branch will not be added to the resulting {@code Map} returned + * by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see + * {@link BranchedKStream} description for details). If {@code null}, a no-op consumer will be supposed Review comment: SGTM. In general, even if possible, I don't like to give `null` semantics :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service,
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r549543247 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedKStreamImpl.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Branched; +import org.apache.kafka.streams.kstream.BranchedKStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class BranchedKStreamImpl implements BranchedKStream { + +private static final String BRANCH_NAME = "KSTREAM-BRANCH-"; + +private final KStreamImpl source; +private final boolean repartitionRequired; +private final String splitterName; +private final Map> result = new HashMap<>(); + +private final List> predicates = new ArrayList<>(); +private final List childNames = new ArrayList<>(); +private final ProcessorGraphNode splitterNode; + +BranchedKStreamImpl(final KStreamImpl source, final boolean repartitionRequired, final NamedInternal named) { +this.source = source; +this.repartitionRequired = repartitionRequired; +this.splitterName = named.orElseGenerateWithPrefix(source.builder, BRANCH_NAME); + +// predicates and childNames are passed by reference so when the user adds a branch they get added to Review comment: Just saw your other comment: https://github.com/apache/kafka/pull/9107#issuecomment-751261181 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r549542995 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedKStreamImpl.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Branched; +import org.apache.kafka.streams.kstream.BranchedKStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class BranchedKStreamImpl implements BranchedKStream { + +private static final String BRANCH_NAME = "KSTREAM-BRANCH-"; + +private final KStreamImpl source; +private final boolean repartitionRequired; +private final String splitterName; +private final Map> result = new HashMap<>(); + +private final List> predicates = new ArrayList<>(); +private final List childNames = new ArrayList<>(); +private final ProcessorGraphNode splitterNode; + +BranchedKStreamImpl(final KStreamImpl source, final boolean repartitionRequired, final NamedInternal named) { +this.source = source; +this.repartitionRequired = repartitionRequired; +this.splitterName = named.orElseGenerateWithPrefix(source.builder, BRANCH_NAME); + +// predicates and childNames are passed by reference so when the user adds a branch they get added to Review comment: Would love to learn about it. -- In general, it's easier to follow the same pattern throughout the code base. It easier to reason about the code that way, and also easier for people to learn the code base. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r549542855 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedKStreamImpl.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Branched; +import org.apache.kafka.streams.kstream.BranchedKStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class BranchedKStreamImpl implements BranchedKStream { + +private static final String BRANCH_NAME = "KSTREAM-BRANCH-"; + +private final KStreamImpl source; +private final boolean repartitionRequired; +private final String splitterName; +private final Map> result = new HashMap<>(); + +private final List> predicates = new ArrayList<>(); +private final List childNames = new ArrayList<>(); +private final ProcessorGraphNode splitterNode; + +BranchedKStreamImpl(final KStreamImpl source, final boolean repartitionRequired, final NamedInternal named) { +this.source = source; +this.repartitionRequired = repartitionRequired; +this.splitterName = named.orElseGenerateWithPrefix(source.builder, BRANCH_NAME); + +// predicates and childNames are passed by reference so when the user adds a branch they get added to +final ProcessorParameters processorParameters = +new ProcessorParameters<>(new KStreamBranch<>(predicates, childNames), splitterName); +splitterNode = new ProcessorGraphNode<>(splitterName, processorParameters); +source.builder.addGraphNode(source.streamsGraphNode, splitterNode); +} + +@Override +public BranchedKStream branch(final Predicate predicate) { +return branch(predicate, BranchedInternal.empty()); +} + +@Override +public BranchedKStream branch(final Predicate predicate, final Branched branched) { +predicates.add(predicate); +createBranch(branched, predicates.size()); +return this; +} + +@Override +public Map> defaultBranch() { +return defaultBranch(BranchedInternal.empty()); +} + +@Override +public Map> defaultBranch(final Branched branched) { +createBranch(branched, 0); Review comment: I guess it's fine both ways. -- The point about the index is a good one that I missed. But would still be doable I guess. I don't think that there would be any measurable runtime difference if you use a "default predicate" (what we also do in the current implementation) -- the code is just a little "cleaner" as we don't need an extra "if" at the end -- but it's also not the end of the world as the `process` method is fairly simply anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10779) Reassignment tool sets throttles incorrectly when overriding a reassignment
[ https://issues.apache.org/jira/browse/KAFKA-10779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming reassigned KAFKA-10779: -- Assignee: dengziming > Reassignment tool sets throttles incorrectly when overriding a reassignment > --- > > Key: KAFKA-10779 > URL: https://issues.apache.org/jira/browse/KAFKA-10779 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: dengziming >Priority: Major > > The logic in `ReassignPartitionsCommand.calculateProposedMoveMap` assumes > that adding replicas are not included in the replica set returned from > `Metadata` or `ListPartitionReassignments`. This is evident in the test case > `ReassignPartitionsUnitTest.testMoveMap`. Because of this incorrect > assumption, the move map is computed incorrectly which can result in the > wrong throttles being applied. As far as I can tell, this is only an issue > when overriding an existing reassignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r547605439 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedKStreamImpl.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Branched; +import org.apache.kafka.streams.kstream.BranchedKStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class BranchedKStreamImpl implements BranchedKStream { + +private static final String BRANCH_NAME = "KSTREAM-BRANCH-"; + +private final KStreamImpl source; +private final boolean repartitionRequired; +private final String splitterName; +private final Map> result = new HashMap<>(); + +private final List> predicates = new ArrayList<>(); +private final List childNames = new ArrayList<>(); +private final ProcessorGraphNode splitterNode; + +BranchedKStreamImpl(final KStreamImpl source, final boolean repartitionRequired, final NamedInternal named) { +this.source = source; +this.repartitionRequired = repartitionRequired; +this.splitterName = named.orElseGenerateWithPrefix(source.builder, BRANCH_NAME); + +// predicates and childNames are passed by reference so when the user adds a branch they get added to +final ProcessorParameters processorParameters = +new ProcessorParameters<>(new KStreamBranch<>(predicates, childNames), splitterName); +splitterNode = new ProcessorGraphNode<>(splitterName, processorParameters); +source.builder.addGraphNode(source.streamsGraphNode, splitterNode); +} + +@Override +public BranchedKStream branch(final Predicate predicate) { +return branch(predicate, BranchedInternal.empty()); Review comment: Missing `null` check for `predicate` (similar below for other methods and parameters). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r547602314 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/BranchedInternal.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Branched; +import org.apache.kafka.streams.kstream.KStream; + +import java.util.Map; + +class BranchedInternal extends Branched { +BranchedInternal(final Branched branched) { +super(branched); +} + +BranchedInternal() { +super(null, null, null); +} + +static BranchedInternal empty() { +return new BranchedInternal<>(); +} + +String branchProcessorName(final String prefix, final int index) { +if (name == null) { +return prefix + index; +} else { +return prefix + name; +} +} + +public void process(final KStreamImpl newStream, final String branchChildName, final Map> result) { +final KStream transformedStream; +if (chainFunction == null) { +transformedStream = newStream; +} else { +transformedStream = chainFunction.apply(newStream); +} +if (transformedStream == null) { +return; +} +if (chainConsumer != null) { +chainConsumer.accept(transformedStream); +return; +} else { +result.put(branchChildName, transformedStream); +} +} Review comment: I think this method is hard to read. Proposal: ``` if (chainFunction != null) { final KStream transformedStream = chainFunction.apply(newStream); if (transformedStream != null) { result.put(branchChildName, transformedStream); } } else if (chainConsumer != null) { chainConsumer.accept(transformedStream); } else { result.put(branchChildName, newStream); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r547599107 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import java.util.Map; + +/** + * Branches the records in the original stream based on the predicates supplied for the branch definitions. + * + * Branches are defined with {@link BranchedKStream#branch(Predicate, Branched)} or + * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the predicates + * supplied via {@link Branched} parameters, and is routed to the first branch for which its respective predicate + * evaluates to {@code true}. + * + * Each branch (which is a {@link KStream} instance) then can be processed either by + * a {@link java.util.function.Function} or a {@link java.util.function.Consumer} provided via a {@link Branched} + * parameter. It also can be accessed from the {@link Map} returned by {@link BranchedKStream#defaultBranch(Branched)} or + * {@link BranchedKStream#noDefaultBranch()} (see usage examples). + * + * The branching happens on first-match: A record in the original stream is assigned to the corresponding result + * stream for the first predicate that evaluates to true, and is assigned to this stream only. If you need + * to route a record to multiple streams, you can use {@link KStream#filter(Predicate)} for each predicate instead + * of branching. + * + * The process of routing the records to different branches is a stateless record-by-record operation. + * Rules of forming the resulting map + * The keys of the {@code Map>} entries returned by {@link BranchedKStream#defaultBranch(Branched)} or + * {@link BranchedKStream#noDefaultBranch()} are defined by the following rules: + * + * + * If {@link Named} parameter was provided for {@link KStream#split(Named)}, its value is used as + * a prefix for each key. By default, no prefix is used + * If a name is provided for the {@link BranchedKStream#branch(Predicate, Branched)} via + * {@link Branched} parameter, its value is appended to the prefix to form the {@code Map} key + * If a name is not provided for the branch, then the key defaults to {@code prefix + position} of the branch + * as a decimal number, starting from {@code "1"} + * If a name is not provided for the {@link BranchedKStream#defaultBranch()} call, then the key defaults + * to {@code prefix + "0"} + * + * + * The values of the respective {@code Map>} entries are formed as following: + * + * + * If no chain function or consumer is provided {@link BranchedKStream#branch(Predicate, Branched)} via + * {@link Branched} parameter, then the value is the branch itself (which is equivalent to {@code ks -> ks} + * identity chain function) + * If a chain function is provided and returns a non-null value for a given branch, then the value is + * the result returned by this function + * If a chain function returns {@code null} for a given branch, then the respective entry is not put to the map. + * If a consumer is provided for a given branch, then the the respective entry is not put to the map + * + * + * For example: + * {@code + * Map> result = + * source.split(Named.as("foo-")) + * .branch(predicate1, Branched.as("bar"))// "foo-bar" + * .branch(predicate2, Branched.withConsumer(ks->ks.to("A")) // no entry: a Consumer is provided + * .branch(predicate3, Branched.withFunction(ks->null)) // no entry: chain function returns null + * .branch(predicate4)// "foo-4": name defaults to the branch position + * .defaultBranch() // "foo-0": "0" is the default name for the default branch + * } + * + * Usage examples + * + * Direct Branch Consuming + * In many cases we do not need to have a single scope for all the branches, each branch being processed completely + * independently from others. Then we can use 'consuming'
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r549538598 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import java.util.Map; + +/** + * Branches the records in the original stream based on the predicates supplied for the branch definitions. + * + * Branches are defined with {@link BranchedKStream#branch(Predicate, Branched)} or + * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the predicates + * supplied via {@link Branched} parameters, and is routed to the first branch for which its respective predicate + * evaluates to {@code true}. + * + * Each branch (which is a {@link KStream} instance) then can be processed either by + * a {@link java.util.function.Function} or a {@link java.util.function.Consumer} provided via a {@link Branched} + * parameter. It also can be accessed from the {@link Map} returned by {@link BranchedKStream#defaultBranch(Branched)} or + * {@link BranchedKStream#noDefaultBranch()} (see usage examples). + * + * The branching happens on first-match: A record in the original stream is assigned to the corresponding result + * stream for the first predicate that evaluates to true, and is assigned to this stream only. If you need + * to route a record to multiple streams, you can use {@link KStream#filter(Predicate)} for each predicate instead + * of branching. + * + * The process of routing the records to different branches is a stateless record-by-record operation. + * Rules of forming the resulting map + * The keys of the {@code Map>} entries returned by {@link BranchedKStream#defaultBranch(Branched)} or + * {@link BranchedKStream#noDefaultBranch()} are defined by the following rules: + * + * + * If {@link Named} parameter was provided for {@link KStream#split(Named)}, its value is used as + * a prefix for each key. By default, no prefix is used + * If a name is provided for the {@link BranchedKStream#branch(Predicate, Branched)} via + * {@link Branched} parameter, its value is appended to the prefix to form the {@code Map} key Review comment: Also my second language... I think, it is mostly a matter of style. Personally, I prefer to use `.` at the end, because a bullet point is still a sentence from my POV, and actually, a bullet point could be multiple sentences. You also capitalized the first word of each bullet point, so it seems to be a sentence. -- I guess some people don't capitalize the first word and also don't use a `.` at the en, ie, us a "no-sentence" bullet point style. But this only work for "single sentence" bullet points. It's really a nit. I just raised my personal (an obviously very subjective) preference. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r547591511 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import java.util.Map; + +/** + * Branches the records in the original stream based on the predicates supplied for the branch definitions. + * + * Branches are defined with {@link BranchedKStream#branch(Predicate, Branched)} or + * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the predicates + * supplied via {@link Branched} parameters, and is routed to the first branch for which its respective predicate + * evaluates to {@code true}. + * + * Each branch (which is a {@link KStream} instance) then can be processed either by + * a {@link java.util.function.Function} or a {@link java.util.function.Consumer} provided via a {@link Branched} + * parameter. It also can be accessed from the {@link Map} returned by {@link BranchedKStream#defaultBranch(Branched)} or + * {@link BranchedKStream#noDefaultBranch()} (see usage examples). + * + * The branching happens on first-match: A record in the original stream is assigned to the corresponding result + * stream for the first predicate that evaluates to true, and is assigned to this stream only. If you need + * to route a record to multiple streams, you can use {@link KStream#filter(Predicate)} for each predicate instead Review comment: `you can apply [multiple] {#filter} [operators], one for each predicate, instead` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9107: KAFKA-5488: Add type-safe branch() operator
mjsax commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r547597683 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * The {@code Branched} class is used to define the optional parameters when building branches with + * {@link BranchedKStream}. + * + * @param type of record key + * @param type of record value + */ +public class Branched implements NamedOperation> { + +protected final String name; +protected final Function, +? extends KStream> chainFunction; +protected final Consumer> chainConsumer; + +protected Branched(final String name, + final Function, ? extends KStream> chainFunction, + final Consumer> chainConsumer) { +this.name = name; +this.chainFunction = chainFunction; +this.chainConsumer = chainConsumer; +} + +/** + * Create an instance of {@link Branched} from an existing instance. + * + * @param branched the instance of {@link Branched} to copy + */ +protected Branched(final Branched branched) { +this(branched.name, branched.chainFunction, branched.chainConsumer); +} + +/** + * Configure the instance of {@link Branched} with a branch name postfix. + * + * @param name the branch name postfix to be used. If {@code null} a default branch name postfix will be generated (see + * {@link BranchedKStream} description for details) + * @return {@code this} + */ +@Override +public Branched withName(final String name) { +return new Branched<>(name, chainFunction, chainConsumer); +} + +/** + * Create an instance of {@link Branched} with provided branch name postfix. + * + * @param name the branch name postfix to be used. If {@code null}, a default branch name postfix will be generated + * (see {@link BranchedKStream} description for details) + * @param key type + * @param value type + * @return a new instance of {@link Branched} + */ +public static Branched as(final String name) { +return new Branched<>(name, null, null); +} + +/** + * Create an instance of {@link Branched} with provided chain function. + * + * @param chain A function that will be applied to the branch. If {@code null}, the identity + * {@code kStream -> kStream} function will be supposed. If this function returns + * {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned + * by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see + * {@link BranchedKStream} description for details). + * @paramkey type + * @paramvalue type + * @return a new instance of {@link Branched} + */ +public static Branched withFunction( +final Function, +? extends KStream> chain) { +return new Branched<>(null, chain, null); +} + +/** + * Create an instance of {@link Branched} with provided chain consumer. + * + * @param chain A consumer to which the branch will be sent. If a non-null branch is provided here, + * the respective branch will not be added to the resulting {@code Map} returned + * by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see + * {@link BranchedKStream} description for details). If {@code null}, a no-op consumer will be supposed Review comment: I tried to read up on the KIP discussion thread, and I am wondering if we did agree to this behavior? My understanding was that if a consumer is use, there won't be any entry in the `Map` for this branch? This is an automated message from the Apache Git Service. To respond to
[GitHub] [kafka] hachikuji commented on a change in pull request #9781: MINOR: Use top-level error in `UpdateFeaturesRequest.getErrorResponse`
hachikuji commented on a change in pull request #9781: URL: https://github.com/apache/kafka/pull/9781#discussion_r549426896 ## File path: clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java ## @@ -56,19 +54,11 @@ public UpdateFeaturesRequest(UpdateFeaturesRequestData data, short version) { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { -final ApiError apiError = ApiError.fromThrowable(e); -final UpdatableFeatureResultCollection results = new UpdatableFeatureResultCollection(); -for (FeatureUpdateKey update : this.data.featureUpdates().valuesSet()) { -final UpdatableFeatureResult result = new UpdatableFeatureResult() -.setFeature(update.feature()) -.setErrorCode(apiError.error().code()) -.setErrorMessage(apiError.message()); -results.add(result); -} -final UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData() -.setThrottleTimeMs(throttleTimeMs) -.setResults(results); -return new UpdateFeaturesResponse(responseData); +return UpdateFeaturesResponse.createWithErrors( +ApiError.fromThrowable(e), +Collections.emptyMap(), Review comment: @chia7712 Good catch. Pushed an update with some test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9520: MINOR: replace test "expected" parameter by assertThrows
ijuma commented on a change in pull request #9520: URL: https://github.com/apache/kafka/pull/9520#discussion_r549419585 ## File path: clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java ## @@ -23,68 +23,74 @@ import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertThrows; + public class GarbageCollectedMemoryPoolTest { -@Test(expected = IllegalArgumentException.class) -public void testZeroSize() throws Exception { -new GarbageCollectedMemoryPool(0, 7, true, null); +@Test +public void testZeroSize() { +assertThrows(IllegalArgumentException.class, +() -> new GarbageCollectedMemoryPool(0, 7, true, null)); } -@Test(expected = IllegalArgumentException.class) -public void testNegativeSize() throws Exception { -new GarbageCollectedMemoryPool(-1, 7, false, null); +@Test +public void testNegativeSize() { +assertThrows(IllegalArgumentException.class, +() -> new GarbageCollectedMemoryPool(-1, 7, false, null)); } -@Test(expected = IllegalArgumentException.class) -public void testZeroMaxAllocation() throws Exception { -new GarbageCollectedMemoryPool(100, 0, true, null); +@Test +public void testZeroMaxAllocation() { +assertThrows(IllegalArgumentException.class, +() -> new GarbageCollectedMemoryPool(100, 0, true, null)); } -@Test(expected = IllegalArgumentException.class) -public void testNegativeMaxAllocation() throws Exception { -new GarbageCollectedMemoryPool(100, -1, false, null); +@Test +public void testNegativeMaxAllocation() { +assertThrows(IllegalArgumentException.class, +() -> new GarbageCollectedMemoryPool(100, -1, false, null)); } -@Test(expected = IllegalArgumentException.class) -public void testMaxAllocationLargerThanSize() throws Exception { -new GarbageCollectedMemoryPool(100, 101, true, null); +@Test +public void testMaxAllocationLargerThanSize() { +assertThrows(IllegalArgumentException.class, +() -> new GarbageCollectedMemoryPool(100, 101, true, null)); } -@Test(expected = IllegalArgumentException.class) -public void testAllocationOverMaxAllocation() throws Exception { +@Test +public void testAllocationOverMaxAllocation() { GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null); -pool.tryAllocate(11); +assertThrows(IllegalArgumentException.class, () -> pool.tryAllocate(11)); } -@Test(expected = IllegalArgumentException.class) -public void testAllocationZero() throws Exception { +@Test +public void testAllocationZero() { GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null); -pool.tryAllocate(0); +assertThrows(IllegalArgumentException.class, () -> pool.tryAllocate(0)); } -@Test(expected = IllegalArgumentException.class) -public void testAllocationNegative() throws Exception { +@Test +public void testAllocationNegative() { GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null); -pool.tryAllocate(-1); +assertThrows(IllegalArgumentException.class, () -> pool.tryAllocate(-1)); } -@Test(expected = IllegalArgumentException.class) -public void testReleaseNull() throws Exception { +@Test +public void testReleaseNull() { GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null); -pool.release(null); +assertThrows(IllegalArgumentException.class, () -> pool.release(null)); } -@Test(expected = IllegalArgumentException.class) -public void testReleaseForeignBuffer() throws Exception { +@Test +public void testReleaseForeignBuffer() { GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null); ByteBuffer fellOffATruck = ByteBuffer.allocate(1); -pool.release(fellOffATruck); -pool.close(); Review comment: Do we still need to call `close` to avoid a leak? ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ## @@ -504,20 +510,21 @@ public void writePastLimit() { } } -@Test(expected = IllegalArgumentException.class) +@Test public void testAppendAtInvalidOffset() { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); long logAppendTime = System.currentTimeMillis(); -MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, +MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, Review comment: Why did
[GitHub] [kafka] ijuma commented on a change in pull request #9520: MINOR: replace test "expected" parameter by assertThrows
ijuma commented on a change in pull request #9520: URL: https://github.com/apache/kafka/pull/9520#discussion_r549417155 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -192,11 +192,12 @@ public void testConstructorWithSerializers() { new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()).close(); } -@Test(expected = ConfigException.class) +@Test public void testNoSerializerProvided() { Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); -new KafkaProducer(producerProps); + Review comment: Was this new line intentional? ## File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ## @@ -444,34 +444,30 @@ public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlread assertEquals(DEFAULT_RETRY_BACKOFF_MS, handler.retryBackoffMs()); } -@Test(expected = IllegalStateException.class) +@Test public void testMaybeAddPartitionToTransactionBeforeInitTransactions() { -transactionManager.failIfNotReadyForSend(); -transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); +assertThrows(IllegalStateException.class, () -> transactionManager.failIfNotReadyForSend()); } -@Test(expected = IllegalStateException.class) +@Test public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() { doInitTransactions(); -transactionManager.failIfNotReadyForSend(); -transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); +assertThrows(IllegalStateException.class, () -> transactionManager.failIfNotReadyForSend()); Review comment: Same as above. ## File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ## @@ -444,34 +444,30 @@ public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlread assertEquals(DEFAULT_RETRY_BACKOFF_MS, handler.retryBackoffMs()); } -@Test(expected = IllegalStateException.class) +@Test public void testMaybeAddPartitionToTransactionBeforeInitTransactions() { -transactionManager.failIfNotReadyForSend(); -transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); +assertThrows(IllegalStateException.class, () -> transactionManager.failIfNotReadyForSend()); } -@Test(expected = IllegalStateException.class) +@Test public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() { doInitTransactions(); -transactionManager.failIfNotReadyForSend(); -transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); +assertThrows(IllegalStateException.class, () -> transactionManager.failIfNotReadyForSend()); } -@Test(expected = KafkaException.class) +@Test public void testMaybeAddPartitionToTransactionAfterAbortableError() { doInitTransactions(); transactionManager.beginTransaction(); transactionManager.transitionToAbortableError(new KafkaException()); -transactionManager.failIfNotReadyForSend(); -transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); +assertThrows(KafkaException.class, () -> transactionManager.failIfNotReadyForSend()); Review comment: Same as above. ## File path: clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ## @@ -663,12 +623,6 @@ public void shouldPreserveOffsetsFromCommitByGroupMetadataOnAbortIfTransactionsA producer.beginTransaction(); String group2 = "g2"; -Map groupCommit2 = new HashMap() { -{ -put(new TopicPartition(topic, 2), new OffsetAndMetadata(53L, null)); -put(new TopicPartition(topic, 3), new OffsetAndMetadata(84L, null)); -} -}; producer.sendOffsetsToTransaction(groupCommit, new ConsumerGroupMetadata(group2)); Review comment: I think this was intended to be `groupCommit2`, right? ## File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ## @@ -444,34 +444,30 @@ public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlread assertEquals(DEFAULT_RETRY_BACKOFF_MS, handler.retryBackoffMs()); } -@Test(expected = IllegalStateException.class) +@Test public void testMaybeAddPartitionToTransactionBeforeInitTransactions() { -transactionManager.failIfNotReadyForSend(); -transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); +
[GitHub] [kafka] bertber commented on a change in pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest
bertber commented on a change in pull request #9761: URL: https://github.com/apache/kafka/pull/9761#discussion_r549409531 ## File path: clients/src/test/java/org/apache/kafka/common/utils/ByteBufferInputStreamTest.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; + +import org.junit.Test; + +public class ByteBufferInputStreamTest { + +@Test +public void testReadUnsignedIntFromInputStream() throws Exception { Review comment: It have been removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bertber commented on a change in pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest
bertber commented on a change in pull request #9761: URL: https://github.com/apache/kafka/pull/9761#discussion_r549409289 ## File path: clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java ## @@ -119,5 +119,4 @@ public void iteratorRaisesOnTooLargeRecords() { assertNotNull(logInputStream.nextBatch()); logInputStream.nextBatch(); } - Review comment: This change have been reverted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on pull request #9789: Force the validation control request,isolate other data requests.Prot…
wenbingshen commented on pull request #9789: URL: https://github.com/apache/kafka/pull/9789#issuecomment-751748730 @showuon This is my first time to submit pr to the community.Could you give me some suggestions?Thank you very much! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10889) The log cleaner is not working for topic partitions
[ https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255618#comment-17255618 ] Wenbing Shen commented on KAFKA-10889: -- [~becket_qin] Why not isolate the message creation time from the log append time,return the creation time for the client,and use the log append time between brokers,which is more friendly to log cleanup. > The log cleaner is not working for topic partitions > --- > > Key: KAFKA-10889 > URL: https://issues.apache.org/jira/browse/KAFKA-10889 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.0.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Blocker > Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, > image-2020-12-28-17-17-15-947.png > > > * I have a topic that is reserved for the default of 7 days, but the log > exists from October 26th to December 25th today.The log cleaner doesn't seem > to be working on it.This seems to be an underlying problem in Kafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10889) The log cleaner is not working for topic partitions
[ https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255527#comment-17255527 ] Jiangjie Qin commented on KAFKA-10889: -- It looks that the broker is working as design. I am not sure what would be expected behavior here. If a message of timestamp T was appended to the log, and the retention is set to R, that message will only be deleted after T + R. Because Kafka maintains the contiguity of the messages, all the messages after that message won't be deleted before that message expires. One potential improvement might be disallowing messages whose timestamp is greater than the current system time by too much. But that may also hurt some other use cases. And defining "too much" could also be tricky. > The log cleaner is not working for topic partitions > --- > > Key: KAFKA-10889 > URL: https://issues.apache.org/jira/browse/KAFKA-10889 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.0.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Blocker > Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, > image-2020-12-28-17-17-15-947.png > > > * I have a topic that is reserved for the default of 7 days, but the log > exists from October 26th to December 25th today.The log cleaner doesn't seem > to be working on it.This seems to be an underlying problem in Kafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] itantiger commented on pull request #9790: Some parameters will be overwritten which was configured in consumer.config.
itantiger commented on pull request #9790: URL: https://github.com/apache/kafka/pull/9790#issuecomment-751673393 > @itantiger Thanks for your patch. Could you revise the title? Finished. When will this patch merge to trunk? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] itantiger commented on pull request #9790: Some parameters will be overwritten which was configured in consumer.config.
itantiger commented on pull request #9790: URL: https://github.com/apache/kafka/pull/9790#issuecomment-751665759 > @itantiger Thanks for your patch. Could you revise the title? Also thank you very much for your guidance :) . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9790: Some parameters will be overwritten which was configured in consumer.config where running "ConsumerPerformance.scala". Linked https://issues
chia7712 commented on pull request #9790: URL: https://github.com/apache/kafka/pull/9790#issuecomment-751664539 @itantiger Thanks for your patch. Could you revise the title? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] itantiger commented on pull request #9790: Some parameters will be overwritten which was configured in consumer.config where running "ConsumerPerformance.scala". Linked https://issue
itantiger commented on pull request #9790: URL: https://github.com/apache/kafka/pull/9790#issuecomment-751664125 > > what should I do next :) > > please take a look at the report and fix the error. https://github.com/apache/kafka/pull/9790/checks?check_run_id=1616222522 I fixed the error, but there were many errors in the second build. It seemed that the errors had nothing to do with what I changed. Please help me to have a look :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10889) The log cleaner is not working for topic partitions
[ https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255491#comment-17255491 ] Wenbing Shen edited comment on KAFKA-10889 at 12/28/20, 9:43 AM: - Hello,When the user is unfamiliar with the configuration, is there a way to solve the problem by design? [~becket_qin] [~junrao] was (Author: wenbing.shen): [~becket_qin] [~junrao] > The log cleaner is not working for topic partitions > --- > > Key: KAFKA-10889 > URL: https://issues.apache.org/jira/browse/KAFKA-10889 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.0.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Blocker > Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, > image-2020-12-28-17-17-15-947.png > > > * I have a topic that is reserved for the default of 7 days, but the log > exists from October 26th to December 25th today.The log cleaner doesn't seem > to be working on it.This seems to be an underlying problem in Kafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10889) The log cleaner is not working for topic partitions
[ https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255491#comment-17255491 ] Wenbing Shen edited comment on KAFKA-10889 at 12/28/20, 9:43 AM: - [~becket_qin] [~junrao] Hello,When the user is unfamiliar with the configuration, is there a way to solve the problem by design? was (Author: wenbing.shen): Hello,When the user is unfamiliar with the configuration, is there a way to solve the problem by design? [~becket_qin] [~junrao] > The log cleaner is not working for topic partitions > --- > > Key: KAFKA-10889 > URL: https://issues.apache.org/jira/browse/KAFKA-10889 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.0.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Blocker > Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, > image-2020-12-28-17-17-15-947.png > > > * I have a topic that is reserved for the default of 7 days, but the log > exists from October 26th to December 25th today.The log cleaner doesn't seem > to be working on it.This seems to be an underlying problem in Kafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10889) The log cleaner is not working for topic partitions
[ https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255491#comment-17255491 ] Wenbing Shen commented on KAFKA-10889: -- [~becket_qin] [~junrao] > The log cleaner is not working for topic partitions > --- > > Key: KAFKA-10889 > URL: https://issues.apache.org/jira/browse/KAFKA-10889 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.0.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Blocker > Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, > image-2020-12-28-17-17-15-947.png > > > * I have a topic that is reserved for the default of 7 days, but the log > exists from October 26th to December 25th today.The log cleaner doesn't seem > to be working on it.This seems to be an underlying problem in Kafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #9792: KAFKA-10870: handle REBALANCE_IN_PROGRESS error in JoinGroup
showuon commented on pull request #9792: URL: https://github.com/apache/kafka/pull/9792#issuecomment-751647364 @hachikuji , please help review this PR. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #9792: KAFKA-10870: handle REBALANCE_IN_PROGRESS error in JoinGroup
showuon opened a new pull request #9792: URL: https://github.com/apache/kafka/pull/9792 handle REBALANCE_IN_PROGRESS error in JoinGroup to log correct info and request a rejoin. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10889) The log cleaner is not working for topic partitions
[ https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255480#comment-17255480 ] Wenbing Shen edited comment on KAFKA-10889 at 12/28/20, 9:24 AM: - The problem appears to be related to this kip: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message] Related to these two configurations: message.timestamp.type=CreateTime message.timestamp.difference.max.ms=Long.MAXVALUE The client generated messages which timestamp is 2020-12-26 14:49:32,this results in a period before 2021-01-02 14:49:32,all log segments of this partition will not be deleted. !image-2020-12-28-17-17-15-947.png! was (Author: wenbing.shen): [https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message] message.timestamp.type max.message.time.difference.ms > The log cleaner is not working for topic partitions > --- > > Key: KAFKA-10889 > URL: https://issues.apache.org/jira/browse/KAFKA-10889 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.0.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Blocker > Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png, > image-2020-12-28-17-17-15-947.png > > > * I have a topic that is reserved for the default of 7 days, but the log > exists from October 26th to December 25th today.The log cleaner doesn't seem > to be working on it.This seems to be an underlying problem in Kafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10870) Consumer should handle REBALANCE_IN_PROGRESS from JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-10870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-10870: - Assignee: Luke Chen > Consumer should handle REBALANCE_IN_PROGRESS from JoinGroup > --- > > Key: KAFKA-10870 > URL: https://issues.apache.org/jira/browse/KAFKA-10870 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Luke Chen >Priority: Major > > We hit a timeout when persisting group metadata to the __consumer_offsets > topic: > {code} > [2020-12-18 18:06:08,209] DEBUG [GroupMetadataManager brokerId=1] Metadata > from group test_group_id with generation 1 failed when appending to log due > to org.apache.kafka.common.errors.TimeoutException > (kafka.coordinator.group.GroupMetadataManager) > [2020-12-18 18:06:08,210] WARN [GroupCoordinator 1]: Failed to persist > metadata for group test_group_id: The group is rebalancing, so a rejoin is > needed. (kafka.coordinator.group.GroupCoordinator) > {code} > This in turn resulted in a REBALANCE_IN_PROGRESS being returned from the > JoinGroup: > {code} > [2020-12-18 18:06:08,211] INFO Completed > request:RequestHeader(apiKey=JOIN_GROUP, apiVersion=7, > clientId=consumer-test_group_id-test_group_id-instance-1, correlationId=3) -- > {group_id=test_group_id,session_timeout_ms=6,rebalance_timeout_ms=30,member_id=,group_instance_id=test_group_id-instance-1,protocol_type=consumer,protocols=[{name=range,metadata=java.nio.HeapByteBuffer[pos=0 > lim=26 > cap=26],_tagged_fields={}}],_tagged_fields={}},response:{throttle_time_ms=0,error_code=27,generation_id=1,protocol_type=consumer,protocol_name=range,leader=test_group_id-instance-2-32e72316-2c3f-40d6-bc34-8ec23d633d34,member_id=,members=[],_tagged_fields={}} > from connection > 172.31.46.222:9092-172.31.44.169:41310-6;totalTime:5014.825,requestQueueTime:0.193,localTime:11.575,remoteTime:5002.195,throttleTime:0.66,responseQueueTime:0.105,sendTime:0.094,sendIoTime:0.038,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=apache-kafka-java, > softwareVersion=5.5.3-ce) (kafka.request.logger) > {code} > The consumer has no logic to handle REBALANCE_IN_PROGRESS from JoinGroup. > {code} > [2020-12-18 18:06:08,210] ERROR [Consumer > instanceId=test_group_id-instance-1, > clientId=consumer-test_group_id-test_group_id-instance-1, > groupId=test_group_id] Attempt to join group failed due to unexpected error > : The group is rebalancing, so a rejoin is needed. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2020-12-18 18:06:08,211] INFO [Consumer instanceId=test_group_id-instance-1, > clientId=consumer-test_group_id-test_group_id-instance-1, > groupId=test_group_id] Join group failed with org.apache.kafka.common.KafkaE > xception: Unexpected error in join group response: The group is rebalancing, > so a rejoin is needed. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2020-12-18 18:06:08,211] ERROR Error during processing, terminating consumer > process: (org.apache.kafka.tools.VerifiableConsumer) > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The group is rebalancing, so a rejoin is needed. > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:653) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:574) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10889) The log cleaner is not working for topic partitions
[ https://issues.apache.org/jira/browse/KAFKA-10889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255480#comment-17255480 ] Wenbing Shen commented on KAFKA-10889: -- [https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message] message.timestamp.type max.message.time.difference.ms > The log cleaner is not working for topic partitions > --- > > Key: KAFKA-10889 > URL: https://issues.apache.org/jira/browse/KAFKA-10889 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.0.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Blocker > Attachments: 0880c08b0110fcdd9b0c.png, 0880c08b0110fcddfb0b.png > > > * I have a topic that is reserved for the default of 7 days, but the log > exists from October 26th to December 25th today.The log cleaner doesn't seem > to be working on it.This seems to be an underlying problem in Kafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller
[ https://issues.apache.org/jira/browse/KAFKA-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GeoffreyStark updated KAFKA-10890: -- Description: In auto.leader.rebalance.enabled=true Taking a Broker 1013 (with many leader partitions) offline; Then, after the leaders of these subdivisions have been elected from other ISRs, In theory, 1013 should once again become the leader of those partitions Indeed, if you look at the describe command and the information of the corresponding partition in ZK, you can see that the leader has changed back to 1013. But if you look at the Controller log and the 1013 log at this point, you'll see that there are some warning messages, and after a while, the producer reports an error" The server is not the leader for that topic-partition..Going to request metadata update now " It looks like the controller failed to send a LeaderAndIsr request to a 1013 node after it restarted, Then the 1013 node‘s log has been {code:java} "Ignoring LeaderAndIsr request from controller 1017 with the correlation id 33 epoch 6 fro partition sp since its associated leader 5 is not who the current leader epoch 5 (state. Change. Logger). " {code} After a while, The producer reports an exception. {code:java} "The server is not The leader for that topically -- partition.. Going to request metadata update now" {code} was: In auto.leader.rebalance.enabled=true Taking a Broker 1013 (with many leader partitions) offline; Then, after the leaders of these subdivisions have been elected from other ISRs, In theory, 1013 should once again become the leader of those partitions Indeed, if you look at the describe command and the information of the corresponding partition in ZK, you can see that the leader has changed back to 1013. But if you look at the Controller log and the 1013 log at this point, you'll see that there are some warning messages, and after a while, the producer reports an error" The server is not the leader for that topic-partition..Going to request metadata update now " > Broker just stated Ignoring LeaderAndIsr request from controller > - > > Key: KAFKA-10890 > URL: https://issues.apache.org/jira/browse/KAFKA-10890 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.0 > Environment: kfk 2.0 > 74 brokers > 3 replica-factors >Reporter: GeoffreyStark >Priority: Major > Attachments: image-2020-12-28-16-59-03-492.png, > jstack-1013broker-1228-1312, kafka元数据混乱.docx > > > In auto.leader.rebalance.enabled=true > Taking a Broker 1013 (with many leader partitions) offline; > Then, after the leaders of these subdivisions have been elected from other > ISRs, > In theory, 1013 should once again become the leader of those partitions > Indeed, if you look at the describe command and the information of the > corresponding partition in ZK, you can see that the leader has changed back > to 1013. > But if you look at the Controller log and the 1013 log at this point, you'll > see that there are some warning messages, and after a while, > the producer reports an error" > The server is not the leader for that topic-partition..Going to request > metadata update now > " > > It looks like the controller failed to send a LeaderAndIsr request to a 1013 > node after it restarted, Then the 1013 node‘s log has been > > {code:java} > "Ignoring LeaderAndIsr request from controller 1017 with the correlation id > 33 epoch 6 fro partition sp since its associated leader 5 is not who the > current leader epoch 5 (state. Change. Logger). " > {code} > > > After a while, The producer reports an exception. > {code:java} > "The server is not The leader for that topically -- partition.. Going to > request metadata update now" > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller
[ https://issues.apache.org/jira/browse/KAFKA-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GeoffreyStark updated KAFKA-10890: -- Attachment: image-2020-12-28-16-59-03-492.png > Broker just stated Ignoring LeaderAndIsr request from controller > - > > Key: KAFKA-10890 > URL: https://issues.apache.org/jira/browse/KAFKA-10890 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.0 > Environment: kfk 2.0 > 74 brokers > 3 replica-factors >Reporter: GeoffreyStark >Priority: Major > Attachments: image-2020-12-28-16-59-03-492.png, > jstack-1013broker-1228-1312, kafka元数据混乱.docx > > > In auto.leader.rebalance.enabled=true > Taking a Broker 1013 (with many leader partitions) offline; > Then, after the leaders of these subdivisions have been elected from other > ISRs, > In theory, 1013 should once again become the leader of those partitions > Indeed, if you look at the describe command and the information of the > corresponding partition in ZK, you can see that the leader has changed back > to 1013. > But if you look at the Controller log and the 1013 log at this point, you'll > see that there are some warning messages, and after a while, > the producer reports an error" > The server is not the leader for that topic-partition..Going to request > metadata update now > " -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller
[ https://issues.apache.org/jira/browse/KAFKA-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255473#comment-17255473 ] GeoffreyStark commented on KAFKA-10890: --- controller log has been follow text !image-2020-12-28-16-59-03-492.png! > Broker just stated Ignoring LeaderAndIsr request from controller > - > > Key: KAFKA-10890 > URL: https://issues.apache.org/jira/browse/KAFKA-10890 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.0 > Environment: kfk 2.0 > 74 brokers > 3 replica-factors >Reporter: GeoffreyStark >Priority: Major > Attachments: image-2020-12-28-16-59-03-492.png, > jstack-1013broker-1228-1312, kafka元数据混乱.docx > > > In auto.leader.rebalance.enabled=true > Taking a Broker 1013 (with many leader partitions) offline; > Then, after the leaders of these subdivisions have been elected from other > ISRs, > In theory, 1013 should once again become the leader of those partitions > Indeed, if you look at the describe command and the information of the > corresponding partition in ZK, you can see that the leader has changed back > to 1013. > But if you look at the Controller log and the 1013 log at this point, you'll > see that there are some warning messages, and after a while, > the producer reports an error" > The server is not the leader for that topic-partition..Going to request > metadata update now > " -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller
[ https://issues.apache.org/jira/browse/KAFKA-10890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17255472#comment-17255472 ] GeoffreyStark commented on KAFKA-10890: --- It looks like the controller failed to send a LeaderAndIsr request to a 1013 node after it restarted, Then the 1013 node‘s log has been {code:java} "Ignoring LeaderAndIsr request from controller 1017 with the correlation id 33 epoch 6 fro partition sp since its associated leader 5 is not who the current leader epoch 5 (state. Change. Logger). " {code} After a while, The producer reports an exception. {code:java} "The server is not The leader for that topically -- partition.. Going to request metadata update now" {code} > Broker just stated Ignoring LeaderAndIsr request from controller > - > > Key: KAFKA-10890 > URL: https://issues.apache.org/jira/browse/KAFKA-10890 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.0 > Environment: kfk 2.0 > 74 brokers > 3 replica-factors >Reporter: GeoffreyStark >Priority: Major > Attachments: jstack-1013broker-1228-1312, kafka元数据混乱.docx > > > In auto.leader.rebalance.enabled=true > Taking a Broker 1013 (with many leader partitions) offline; > Then, after the leaders of these subdivisions have been elected from other > ISRs, > In theory, 1013 should once again become the leader of those partitions > Indeed, if you look at the describe command and the information of the > corresponding partition in ZK, you can see that the leader has changed back > to 1013. > But if you look at the Controller log and the 1013 log at this point, you'll > see that there are some warning messages, and after a while, > the producer reports an error" > The server is not the leader for that topic-partition..Going to request > metadata update now > " -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10890) Broker just stated Ignoring LeaderAndIsr request from controller
GeoffreyStark created KAFKA-10890: - Summary: Broker just stated Ignoring LeaderAndIsr request from controller Key: KAFKA-10890 URL: https://issues.apache.org/jira/browse/KAFKA-10890 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.0.0 Environment: kfk 2.0 74 brokers 3 replica-factors Reporter: GeoffreyStark Attachments: jstack-1013broker-1228-1312, kafka元数据混乱.docx In auto.leader.rebalance.enabled=true Taking a Broker 1013 (with many leader partitions) offline; Then, after the leaders of these subdivisions have been elected from other ISRs, In theory, 1013 should once again become the leader of those partitions Indeed, if you look at the describe command and the information of the corresponding partition in ZK, you can see that the leader has changed back to 1013. But if you look at the Controller log and the 1013 log at this point, you'll see that there are some warning messages, and after a while, the producer reports an error" The server is not the leader for that topic-partition..Going to request metadata update now " -- This message was sent by Atlassian Jira (v8.3.4#803005)