[GitHub] [kafka] chia7712 opened a new pull request #10171: MINOR: avoid duplicate array copying from BufferValue#serialize
chia7712 opened a new pull request #10171: URL: https://github.com/apache/kafka/pull/10171 We can write ProcessorRecordContext to target `ByteBuffer` directly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10024: KAFKA-12273 InterBrokerSendThread#pollOnce throws FatalExitError even…
chia7712 commented on pull request #10024: URL: https://github.com/apache/kafka/pull/10024#issuecomment-783116760 `Build / JDK 11 / org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers` It is fixed by #10152 and #10158 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions
chia7712 commented on a change in pull request #10157: URL: https://github.com/apache/kafka/pull/10157#discussion_r579977803 ## File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala ## @@ -124,8 +124,18 @@ class KafkaNetworkChannel( } def onComplete(clientResponse: ClientResponse): Unit = { - val response = if (clientResponse.authenticationException != null) { -errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED) + val response = if (clientResponse.versionMismatch != null) { +error(s"Request $request failed due to unsupported version error", + clientResponse.authenticationException) Review comment: It should pass `clientResponse.versionMismatch` rather than `clientResponse.authenticationException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions
hachikuji commented on a change in pull request #10157: URL: https://github.com/apache/kafka/pull/10157#discussion_r579959527 ## File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala ## @@ -124,8 +124,14 @@ class KafkaNetworkChannel( } def onComplete(clientResponse: ClientResponse): Unit = { - val response = if (clientResponse.authenticationException != null) { -errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED) + val response = if (clientResponse.versionMismatch != null) { +errorResponse(request.data, Errors.UNSUPPORTED_VERSION) + } else if (clientResponse.authenticationException != null) { +// For now we treat authentication errors as retriable. We use the +// `NETWORK_EXCEPTION` error code for lack of a good alternative. +// Note that `BrokerToControllerChannelManager` will still log the +// authentication errors so that users have a chance to fix the problem. +errorResponse(request.data, Errors.NETWORK_EXCEPTION) Review comment: Filed this: https://issues.apache.org/jira/browse/KAFKA-12355. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12355) Consider inter-broker authentication error handling
Jason Gustafson created KAFKA-12355: --- Summary: Consider inter-broker authentication error handling Key: KAFKA-12355 URL: https://issues.apache.org/jira/browse/KAFKA-12355 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Currently authentication errors between brokers are generally considered retriable. The broker will log an error, but continue trying to reach the other broker. This could be improved. For example, authentication errors (specifically from the broker to controller in KIP-500) should probably be considered fatal during some window during initialization. This makes it easy for users to detect problems quickly. On the other hand, if a broker has been running for some time, we probably do not want to fail it on the first authentication failure. If a user had added a misconfigured controller to the cluster, it could end up taking down the whole cluster through authentication failures. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12354) PHP Kafka Client longyan/phpkafka
Zhang Runyu created KAFKA-12354: --- Summary: PHP Kafka Client longyan/phpkafka Key: KAFKA-12354 URL: https://issues.apache.org/jira/browse/KAFKA-12354 Project: Kafka Issue Type: Wish Components: documentation Reporter: Zhang Runyu Hello! Our team is maintaining a php kafka client. We hope we can add this item to this page: [https://cwiki.apache.org/confluence/display/KAFKA/Clients] Introduction: PHP Kafka client is used in PHP-FPM and Swoole. The communication protocol is based on the JSON file in Java. PHP Kafka client supports 50 APIs, which might be one that supports the most message types ever. Github: [https://github.com/longyan/phpkafka] Some users are already using this package. Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12353) Improve ClientResponse error handling
Jason Gustafson created KAFKA-12353: --- Summary: Improve ClientResponse error handling Key: KAFKA-12353 URL: https://issues.apache.org/jira/browse/KAFKA-12353 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson `NetworkClient` exposes a `ClientResponse` object as the result of a sent request. This is currently handling these cases: - Response returned successfully - Socket disconnected before response was read - Authentication failed while connecting - Unsupported version error due to client finding no compatible version with broker The problem is that there is no type protection to ensure that each of these cases are handled. We missed some of them initially in `BrokerToControllerChannelManager` and had to fix them here: https://github.com/apache/kafka/pull/10157. It would be useful to consider a refactor which makes it harder to overlook the exceptional cases. Perhaps an approach similar to `CompletableFuture.whenComplete` which takes a`BiConsumer` would be reasonable. Then we could consolidate the three error cases above by returning one of the following exceptions: - DisconnectException - AuthenticationFailedException - UnsupportedVersionException -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions
hachikuji commented on a change in pull request #10157: URL: https://github.com/apache/kafka/pull/10157#discussion_r579954768 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -165,9 +165,19 @@ class DefaultAlterIsrManager( new ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { debug(s"Received AlterIsr response $response") - val body = response.responseBody().asInstanceOf[AlterIsrResponse] val error = try { -handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) +if (response.authenticationException != null) { + // For now we treat authentication errors as retriable. We use the + // `NETWORK_EXCEPTION` error code for lack of a good alternative. + // Note that `BrokerToControllerChannelManager` will still log the + // authentication errors so that users have a chance to fix the problem. + Errors.NETWORK_EXCEPTION +} else if (response.versionMismatch != null) { Review comment: Filed this: https://issues.apache.org/jira/browse/KAFKA-12353 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions
jsancio commented on a change in pull request #10157: URL: https://github.com/apache/kafka/pull/10157#discussion_r579892241 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2244,7 +2244,6 @@ private Long append(int epoch, List records, boolean isAtomic) { @Override public void close() { -log.close(); Review comment: The remove was done here: https://github.com/apache/kafka/pull/10168 Both the `RaftManager` and `KafkaRaftClient` where closing the `ReplicatedLog`. It was decided that the `RaftManager` owns the log and should close it instead of the `KafkaRaftClient`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions
ijuma commented on a change in pull request #10157: URL: https://github.com/apache/kafka/pull/10157#discussion_r579875328 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -2244,7 +2244,6 @@ private Long append(int epoch, List records, boolean isAtomic) { @Override public void close() { -log.close(); Review comment: This has since been done via a separate PR that was already merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions
ijuma commented on a change in pull request #10157: URL: https://github.com/apache/kafka/pull/10157#discussion_r579875306 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -165,9 +165,19 @@ class DefaultAlterIsrManager( new ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { debug(s"Received AlterIsr response $response") - val body = response.responseBody().asInstanceOf[AlterIsrResponse] val error = try { -handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) +if (response.authenticationException != null) { + // For now we treat authentication errors as retriable. We use the + // `NETWORK_EXCEPTION` error code for lack of a good alternative. + // Note that `BrokerToControllerChannelManager` will still log the + // authentication errors so that users have a chance to fix the problem. + Errors.NETWORK_EXCEPTION +} else if (response.versionMismatch != null) { Review comment: I'm fine to improve this post 2.8, let's file a JIRA please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions
ijuma commented on a change in pull request #10157: URL: https://github.com/apache/kafka/pull/10157#discussion_r579875266 ## File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala ## @@ -124,8 +124,14 @@ class KafkaNetworkChannel( } def onComplete(clientResponse: ClientResponse): Unit = { - val response = if (clientResponse.authenticationException != null) { -errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED) + val response = if (clientResponse.versionMismatch != null) { +errorResponse(request.data, Errors.UNSUPPORTED_VERSION) + } else if (clientResponse.authenticationException != null) { +// For now we treat authentication errors as retriable. We use the +// `NETWORK_EXCEPTION` error code for lack of a good alternative. +// Note that `BrokerToControllerChannelManager` will still log the +// authentication errors so that users have a chance to fix the problem. +errorResponse(request.data, Errors.NETWORK_EXCEPTION) Review comment: Makes sense. Let's file a JIRA to improve it as you suggested here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10034) Clarify Usage of "batch.size" and "max.request.size" Producer Configs
[ https://issues.apache.org/jira/browse/KAFKA-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17288088#comment-17288088 ] Badai Aqrandista commented on KAFKA-10034: -- Yes, I am. Will do that this week. > Clarify Usage of "batch.size" and "max.request.size" Producer Configs > - > > Key: KAFKA-10034 > URL: https://issues.apache.org/jira/browse/KAFKA-10034 > Project: Kafka > Issue Type: Improvement > Components: docs, producer >Reporter: Mark Cox >Assignee: Badai Aqrandista >Priority: Minor > > The documentation around the producer configurations "batch.size" and > "max.request.size", and how they relate to one another, can be confusing. > In reality, the "max.request.size" is a hard limit on each individual record, > but the documentation makes it seem this is the maximum size of a request > sent to Kafka. If there is a situation where "batch.size" is set greater > than "max.request.size" (and each individual record is smaller than > "max.request.size") you could end up with larger requests than expected sent > to Kafka. > There are a few things that could be considered to make this clearer: > # Improve the documentation to clarify the two producer configurations and > how they relate to each other > # Provide a producer check, and possibly a warning, if "batch.size" is found > to be greater than "max.request.size" > # The producer could take the _minimum_ of "batch.size" or "max.request.size" > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] sknop commented on a change in pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
sknop commented on a change in pull request #9950: URL: https://github.com/apache/kafka/pull/9950#discussion_r579839255 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java ## @@ -364,11 +365,24 @@ private static String castToString(Object value) { if (value instanceof java.util.Date) { java.util.Date dateValue = (java.util.Date) value; return Values.dateFormatFor(dateValue).format(dateValue); +} else if (value instanceof ByteBuffer) { +ByteBuffer byteBuffer = (ByteBuffer) value; +return castByteArrayToString(byteBuffer.array()); +} else if (value instanceof byte[]) { +return castByteArrayToString((byte[]) value); } else { return value.toString(); } } +private static String castByteArrayToString(byte[] array) { +StringBuilder sbuf = new StringBuilder(); +for (byte b : array) { +sbuf.append(String.format("%02X", b)); Review comment: The reason I suggested Hex representation was that in the use case I encountered - database PK field encoded as BINARY retrieved via CDC Source Connector - the representation in the SQL CLI was Hex, so choosing the same format made it easy to compare source and target. I would suggest that base64 encoding would validate its own dedicated transformer, with an ability to choose padding, for example This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sknop commented on a change in pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
sknop commented on a change in pull request #9950: URL: https://github.com/apache/kafka/pull/9950#discussion_r579838511 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java ## @@ -364,11 +365,24 @@ private static String castToString(Object value) { if (value instanceof java.util.Date) { java.util.Date dateValue = (java.util.Date) value; return Values.dateFormatFor(dateValue).format(dateValue); +} else if (value instanceof ByteBuffer) { +ByteBuffer byteBuffer = (ByteBuffer) value; +return castByteArrayToString(byteBuffer.array()); Review comment: That is a good point, but what should happen in that case? I assume we start with hasArray() and only process the content if an array is available, as I understand it? ByteBuffer.remaining() would contain the number of bytes available ... I'll go and write some test cases to understand this better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12337) provide full scala api for operators naming
[ https://issues.apache.org/jira/browse/KAFKA-12337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GeordieMai resolved KAFKA-12337. Resolution: Duplicate > provide full scala api for operators naming > > > Key: KAFKA-12337 > URL: https://issues.apache.org/jira/browse/KAFKA-12337 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Ramil Israfilov >Priority: Major > > Kafka Streams Java DSL provides possibility to do custom naming for all > operators via Named, Grouped, Consumed objects (there is a separate dev guide > page about it > [https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-topology-naming.html] > ) > But Scala api for Kafka Streams provide only partial support. > For example following API's are missing custom naming: > filter,selectKey, map, mapValues, flatMap... > Probably there is same issue for other scala objects. > As workaround I have to do quite ugly calls to inner KStream java class and > perform scala2java and back conversions. > Would be really handy if all custom naming API's will be also supported on > Scala Kafka Streams DSL -- This message was sent by Atlassian Jira (v8.3.4#803005)