[GitHub] [kafka] chia7712 opened a new pull request #10171: MINOR: avoid duplicate array copying from BufferValue#serialize

2021-02-21 Thread GitBox


chia7712 opened a new pull request #10171:
URL: https://github.com/apache/kafka/pull/10171


   We can write ProcessorRecordContext to target `ByteBuffer` directly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10024: KAFKA-12273 InterBrokerSendThread#pollOnce throws FatalExitError even…

2021-02-21 Thread GitBox


chia7712 commented on pull request #10024:
URL: https://github.com/apache/kafka/pull/10024#issuecomment-783116760


   `Build / JDK 11 / 
org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers`
   
   It is fixed by #10152 and #10158



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-21 Thread GitBox


chia7712 commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579977803



##
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##
@@ -124,8 +124,18 @@ class KafkaNetworkChannel(
 }
 
 def onComplete(clientResponse: ClientResponse): Unit = {
-  val response = if (clientResponse.authenticationException != null) {
-errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+  val response = if (clientResponse.versionMismatch != null) {
+error(s"Request $request failed due to unsupported version error",
+  clientResponse.authenticationException)

Review comment:
   It should pass `clientResponse.versionMismatch` rather than 
`clientResponse.authenticationException`  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-21 Thread GitBox


hachikuji commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579959527



##
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##
@@ -124,8 +124,14 @@ class KafkaNetworkChannel(
 }
 
 def onComplete(clientResponse: ClientResponse): Unit = {
-  val response = if (clientResponse.authenticationException != null) {
-errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+  val response = if (clientResponse.versionMismatch != null) {
+errorResponse(request.data, Errors.UNSUPPORTED_VERSION)
+  } else if (clientResponse.authenticationException != null) {
+// For now we treat authentication errors as retriable. We use the
+// `NETWORK_EXCEPTION` error code for lack of a good alternative.
+// Note that `BrokerToControllerChannelManager` will still log the
+// authentication errors so that users have a chance to fix the 
problem.
+errorResponse(request.data, Errors.NETWORK_EXCEPTION)

Review comment:
   Filed this: https://issues.apache.org/jira/browse/KAFKA-12355.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12355) Consider inter-broker authentication error handling

2021-02-21 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12355:
---

 Summary: Consider inter-broker authentication error handling
 Key: KAFKA-12355
 URL: https://issues.apache.org/jira/browse/KAFKA-12355
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Currently authentication errors between brokers are generally considered 
retriable. The broker will log an error, but continue trying to reach the other 
broker. This could be improved. 

For example, authentication errors (specifically from the broker to controller 
in KIP-500) should probably be considered fatal during some window during 
initialization. This makes it easy for users to detect problems quickly. 

On the other hand, if a broker has been running for some time, we probably do 
not want to fail it on the first authentication failure. If a user had added a 
misconfigured controller to the cluster, it could end up taking down the whole 
cluster through authentication failures. 





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12354) PHP Kafka Client longyan/phpkafka

2021-02-21 Thread Zhang Runyu (Jira)
Zhang Runyu created KAFKA-12354:
---

 Summary: PHP Kafka Client longyan/phpkafka
 Key: KAFKA-12354
 URL: https://issues.apache.org/jira/browse/KAFKA-12354
 Project: Kafka
  Issue Type: Wish
  Components: documentation
Reporter: Zhang Runyu


Hello!
 
Our team is maintaining a php kafka client.
 
We hope we can add this item to this page: 
[https://cwiki.apache.org/confluence/display/KAFKA/Clients]
 
Introduction:
 
PHP Kafka client is used in PHP-FPM and Swoole.

The communication protocol is based on the JSON file in Java. PHP Kafka client 
supports 50 APIs, which might be one that supports the most message types ever.
Github: [https://github.com/longyan/phpkafka]
 
Some users are already using this package.
 
Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12353) Improve ClientResponse error handling

2021-02-21 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12353:
---

 Summary: Improve ClientResponse error handling
 Key: KAFKA-12353
 URL: https://issues.apache.org/jira/browse/KAFKA-12353
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


`NetworkClient` exposes a `ClientResponse` object as the result of a sent 
request. This is currently handling these cases:

- Response returned successfully
- Socket disconnected before response was read
- Authentication failed while connecting
- Unsupported version error due to client finding no compatible version with 
broker

The problem is that there is no type protection to ensure that each of these 
cases are handled. We missed some of them initially in 
`BrokerToControllerChannelManager` and had to fix them here: 
https://github.com/apache/kafka/pull/10157.

It would be useful to consider a refactor which makes it harder to overlook the 
exceptional cases. Perhaps an approach similar to 
`CompletableFuture.whenComplete` which takes a`BiConsumer` would be reasonable. Then we could consolidate the three error 
cases above by returning one of the following exceptions:

- DisconnectException
- AuthenticationFailedException
- UnsupportedVersionException





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-21 Thread GitBox


hachikuji commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579954768



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -165,9 +165,19 @@ class DefaultAlterIsrManager(
   new ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
   debug(s"Received AlterIsr response $response")
-  val body = response.responseBody().asInstanceOf[AlterIsrResponse]
   val error = try {
-handleAlterIsrResponse(body, message.brokerEpoch, 
inflightAlterIsrItems)
+if (response.authenticationException != null) {
+  // For now we treat authentication errors as retriable. We use 
the
+  // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+  // Note that `BrokerToControllerChannelManager` will still log 
the
+  // authentication errors so that users have a chance to fix the 
problem.
+  Errors.NETWORK_EXCEPTION
+} else if (response.versionMismatch != null) {

Review comment:
   Filed this: https://issues.apache.org/jira/browse/KAFKA-12353





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-21 Thread GitBox


jsancio commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579892241



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2244,7 +2244,6 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 
 @Override
 public void close() {
-log.close();

Review comment:
   The remove was done here: https://github.com/apache/kafka/pull/10168
   
   Both the `RaftManager` and `KafkaRaftClient` where closing the 
`ReplicatedLog`. It was decided that the `RaftManager` owns the log and should 
close it instead of the `KafkaRaftClient`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-21 Thread GitBox


ijuma commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579875328



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2244,7 +2244,6 @@ private Long append(int epoch, List records, boolean 
isAtomic) {
 
 @Override
 public void close() {
-log.close();

Review comment:
   This has since been done via a separate PR that was already merged.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-21 Thread GitBox


ijuma commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579875306



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -165,9 +165,19 @@ class DefaultAlterIsrManager(
   new ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
   debug(s"Received AlterIsr response $response")
-  val body = response.responseBody().asInstanceOf[AlterIsrResponse]
   val error = try {
-handleAlterIsrResponse(body, message.brokerEpoch, 
inflightAlterIsrItems)
+if (response.authenticationException != null) {
+  // For now we treat authentication errors as retriable. We use 
the
+  // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+  // Note that `BrokerToControllerChannelManager` will still log 
the
+  // authentication errors so that users have a chance to fix the 
problem.
+  Errors.NETWORK_EXCEPTION
+} else if (response.versionMismatch != null) {

Review comment:
   I'm fine to improve this post 2.8, let's file a JIRA please.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10157: MINOR: Raft request thread should discover api versions

2021-02-21 Thread GitBox


ijuma commented on a change in pull request #10157:
URL: https://github.com/apache/kafka/pull/10157#discussion_r579875266



##
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##
@@ -124,8 +124,14 @@ class KafkaNetworkChannel(
 }
 
 def onComplete(clientResponse: ClientResponse): Unit = {
-  val response = if (clientResponse.authenticationException != null) {
-errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+  val response = if (clientResponse.versionMismatch != null) {
+errorResponse(request.data, Errors.UNSUPPORTED_VERSION)
+  } else if (clientResponse.authenticationException != null) {
+// For now we treat authentication errors as retriable. We use the
+// `NETWORK_EXCEPTION` error code for lack of a good alternative.
+// Note that `BrokerToControllerChannelManager` will still log the
+// authentication errors so that users have a chance to fix the 
problem.
+errorResponse(request.data, Errors.NETWORK_EXCEPTION)

Review comment:
   Makes sense. Let's file a JIRA to improve it as you suggested here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10034) Clarify Usage of "batch.size" and "max.request.size" Producer Configs

2021-02-21 Thread Badai Aqrandista (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17288088#comment-17288088
 ] 

Badai Aqrandista commented on KAFKA-10034:
--

Yes, I am. Will do that this week.

> Clarify Usage of "batch.size" and "max.request.size" Producer Configs
> -
>
> Key: KAFKA-10034
> URL: https://issues.apache.org/jira/browse/KAFKA-10034
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, producer 
>Reporter: Mark Cox
>Assignee: Badai Aqrandista
>Priority: Minor
>
> The documentation around the producer configurations "batch.size" and 
> "max.request.size", and how they relate to one another, can be confusing.
> In reality, the "max.request.size" is a hard limit on each individual record, 
> but the documentation makes it seem this is the maximum size of a request 
> sent to Kafka.  If there is a situation where "batch.size" is set greater 
> than "max.request.size" (and each individual record is smaller than 
> "max.request.size") you could end up with larger requests than expected sent 
> to Kafka.
> There are a few things that could be considered to make this clearer:
>  # Improve the documentation to clarify the two producer configurations and 
> how they relate to each other
>  # Provide a producer check, and possibly a warning, if "batch.size" is found 
> to be greater than "max.request.size"
>  # The producer could take the _minimum_ of "batch.size" or "max.request.size"
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] sknop commented on a change in pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-21 Thread GitBox


sknop commented on a change in pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#discussion_r579839255



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
##
@@ -364,11 +365,24 @@ private static String castToString(Object value) {
 if (value instanceof java.util.Date) {
 java.util.Date dateValue = (java.util.Date) value;
 return Values.dateFormatFor(dateValue).format(dateValue);
+} else if (value instanceof ByteBuffer) {
+ByteBuffer byteBuffer = (ByteBuffer) value;
+return castByteArrayToString(byteBuffer.array());
+} else if (value instanceof byte[]) {
+return castByteArrayToString((byte[]) value);
 } else {
 return value.toString();
 }
 }
 
+private static String castByteArrayToString(byte[] array) {
+StringBuilder sbuf = new StringBuilder();
+for (byte b : array) {
+sbuf.append(String.format("%02X", b));

Review comment:
   The reason I suggested Hex representation was that in the use case I 
encountered - database PK field encoded as BINARY retrieved via CDC Source 
Connector - the representation in the SQL CLI was Hex, so choosing the same 
format made it easy to compare source and target.
   
   I would suggest that base64 encoding would validate its own dedicated 
transformer, with an ability to choose padding, for example 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sknop commented on a change in pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-21 Thread GitBox


sknop commented on a change in pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#discussion_r579838511



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
##
@@ -364,11 +365,24 @@ private static String castToString(Object value) {
 if (value instanceof java.util.Date) {
 java.util.Date dateValue = (java.util.Date) value;
 return Values.dateFormatFor(dateValue).format(dateValue);
+} else if (value instanceof ByteBuffer) {
+ByteBuffer byteBuffer = (ByteBuffer) value;
+return castByteArrayToString(byteBuffer.array());

Review comment:
   That is a good point, but what should happen in that case? 
   
   I assume we start with hasArray() and only process the content if an array 
is available, as I understand it? ByteBuffer.remaining() would contain the 
number of bytes available ... I'll go and write some test cases to understand 
this better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12337) provide full scala api for operators naming

2021-02-21 Thread GeordieMai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GeordieMai resolved KAFKA-12337.

Resolution: Duplicate

> provide full scala api for operators naming 
> 
>
> Key: KAFKA-12337
> URL: https://issues.apache.org/jira/browse/KAFKA-12337
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Ramil Israfilov
>Priority: Major
>
> Kafka Streams Java DSL provides possibility to do custom naming for all 
> operators via Named, Grouped, Consumed objects (there is a separate dev guide 
> page about it 
> [https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-topology-naming.html]
>  )
> But Scala api for Kafka Streams provide only partial support.
> For example following API's are missing custom naming:
> filter,selectKey, map, mapValues, flatMap... 
> Probably there is same issue for other scala objects. 
> As workaround I have to do quite ugly calls to inner KStream java class and 
> perform scala2java and back conversions. 
> Would be really handy if all custom naming API's will be also supported on 
> Scala Kafka Streams DSL



--
This message was sent by Atlassian Jira
(v8.3.4#803005)