[GitHub] [kafka] chia7712 commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-05 Thread GitBox


chia7712 commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r518564543



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java
##
@@ -91,4 +91,14 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
 public static EnvelopeRequest parse(ByteBuffer buffer, short version) {
 return new EnvelopeRequest(ApiKeys.ENVELOPE.parseRequest(version, 
buffer), version);
 }
+
+public EnvelopeRequestData data() {
+return data;
+}
+
+@Override
+public Send toSend(String destination, RequestHeader header) {
+return SendBuilder.buildRequestSend(destination, header, this.data);

Review comment:
   not sure whether it is ok to ignore the version of EnvelopeRequest.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10667) Add timeout for forwarding requests

2020-11-05 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-10667:
---

Assignee: Boyang Chen

> Add timeout for forwarding requests
> ---
>
> Key: KAFKA-10667
> URL: https://issues.apache.org/jira/browse/KAFKA-10667
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> It makes sense to handle timeout for forwarding request coming from the 
> client, instead of retry indefinitely. We could either use the api timeout, 
> or a customized timeout hook which could be defined by different request 
> types.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10674) Brokers should know the active controller ApiVersion after enabling KIP-590 forwarding

2020-11-05 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10674:

Parent: KAFKA-9705
Issue Type: Sub-task  (was: Improvement)

> Brokers should know the active controller ApiVersion after enabling KIP-590 
> forwarding
> --
>
> Key: KAFKA-10674
> URL: https://issues.apache.org/jira/browse/KAFKA-10674
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core
>Reporter: Boyang Chen
>Priority: Major
>
> Admin clients send ApiVersions to the broker upon the first connection 
> establishes. The tricky thing after forwarding is enabled is that for 
> forwardable APIs, admin client needs to know a commonly-agreed range of 
> ApiVersions among handling broker, active controller and itself.
> Right now the inter-broker APIs are guaranteed by IBP constraints, but not 
> for forwardable APIs. A compromised solution would be to put all forwardable 
> APIs under IBP, which is brittle and hard to maintain consistency.
> Instead, any broker connecting to the active controller should send an 
> ApiVersion request from beginning, so it is easy to compute that information 
> and send back to the admin clients upon ApiVersion request from admin.  Any 
> rolling of the active controller will trigger reconnection between broker and 
> controller, which guarantees a refreshed ApiVersions between the two. This 
> approach avoids the tight bond with IBP and broker could just close the 
> connection between admin client to trigger retry logic and refreshing of the 
> ApiVersions. Since this failure should be rare, two round-trips and timeout 
> delays are well compensated by the less engineering work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-10691) AlterIsr Respond with wrong Error Id

2020-11-05 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming closed KAFKA-10691.
--

> AlterIsr Respond with wrong Error Id
> 
>
> Key: KAFKA-10691
> URL: https://issues.apache.org/jira/browse/KAFKA-10691
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>
> AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, 
> which should be UNKNOWN_MEMBER_ID. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10691) AlterIsr Respond with wrong Error Id

2020-11-05 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227199#comment-17227199
 ] 

dengziming commented on KAFKA-10691:


Sorry, I misunderstood the `STALE_BROKER_EPOCH`,   I just inspect The KIP-497 
and there isn't a `{{UNKNOWN_MEMBER_ID}}`. 

> AlterIsr Respond with wrong Error Id
> 
>
> Key: KAFKA-10691
> URL: https://issues.apache.org/jira/browse/KAFKA-10691
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>
> AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, 
> which should be UNKNOWN_MEMBER_ID. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10691) AlterIsr Respond with wrong Error Id

2020-11-05 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-10691.

Resolution: Abandoned

> AlterIsr Respond with wrong Error Id
> 
>
> Key: KAFKA-10691
> URL: https://issues.apache.org/jira/browse/KAFKA-10691
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>
> AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, 
> which should be UNKNOWN_MEMBER_ID. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on a change in pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id

2020-11-05 Thread GitBox


dengziming commented on a change in pull request #9571:
URL: https://github.com/apache/kafka/pull/9571#discussion_r518555329



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2261,7 +2261,7 @@ class KafkaController(val config: KafkaConfig,
 val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
 if (brokerEpochOpt.isEmpty) {
   info(s"Ignoring AlterIsr due to unknown broker $brokerId")
-  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  callback.apply(Right(Errors.UNKNOWN_MEMBER_ID))

Review comment:
   You are right, I just inspect The KIP-497 and there isn't a 
`UNKNOWN_MEMBER_ID`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10674) Brokers should know the active controller ApiVersion after enabling KIP-590 forwarding

2020-11-05 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10674:

Description: 
Admin clients send ApiVersions to the broker upon the first connection 
establishes. The tricky thing after forwarding is enabled is that for 
forwardable APIs, admin client needs to know a commonly-agreed range of 
ApiVersions among handling broker, active controller and itself.

Right now the inter-broker APIs are guaranteed by IBP constraints, but not for 
forwardable APIs. A compromised solution would be to put all forwardable APIs 
under IBP, which is brittle and hard to maintain consistency.

Instead, any broker connecting to the active controller should send an 
ApiVersion request from beginning, so it is easy to compute that information 
and send back to the admin clients upon ApiVersion request from admin.  Any 
rolling of the active controller will trigger reconnection between broker and 
controller, which guarantees a refreshed ApiVersions between the two. This 
approach avoids the tight bond with IBP and broker could just close the 
connection between admin client to trigger retry logic and refreshing of the 
ApiVersions. Since this failure should be rare, two round-trips and timeout 
delays are well compensated by the less engineering work.

  was:
Admin clients send ApiVersions to the broker upon the first connection 
establishes. The tricky thing after forwarding is enabled is that for 
forwardable APIs, admin client needs to know a commonly-agreed rang of 
ApiVersions among handling broker, active controller and itself.

Right now the inter-broker APIs are guaranteed by IBP constraints, but not for 
forwardable APIs. A compromised solution would be to put all forwardable APIs 
under IBP, which is brittle and hard to maintain consistency.

Instead, any broker connecting to the active controller should send an 
ApiVersion request from beginning, so it is easy to compute that information 
and send back to the admin clients upon ApiVersion request from admin.  Any 
rolling of the active controller will trigger reconnection between broker and 
controller, which guarantees a refreshed ApiVersions between the two. This 
approach avoids the tight bond with IBP and broker could just close the 
connection between admin client to trigger retry logic and refreshing of the 
ApiVersions. Since this failure should be rare, two round-trips and timeout 
delays are well compensated by the less engineering work.


> Brokers should know the active controller ApiVersion after enabling KIP-590 
> forwarding
> --
>
> Key: KAFKA-10674
> URL: https://issues.apache.org/jira/browse/KAFKA-10674
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Boyang Chen
>Priority: Major
>
> Admin clients send ApiVersions to the broker upon the first connection 
> establishes. The tricky thing after forwarding is enabled is that for 
> forwardable APIs, admin client needs to know a commonly-agreed range of 
> ApiVersions among handling broker, active controller and itself.
> Right now the inter-broker APIs are guaranteed by IBP constraints, but not 
> for forwardable APIs. A compromised solution would be to put all forwardable 
> APIs under IBP, which is brittle and hard to maintain consistency.
> Instead, any broker connecting to the active controller should send an 
> ApiVersion request from beginning, so it is easy to compute that information 
> and send back to the admin clients upon ApiVersion request from admin.  Any 
> rolling of the active controller will trigger reconnection between broker and 
> controller, which guarantees a refreshed ApiVersions between the two. This 
> approach avoids the tight bond with IBP and broker could just close the 
> connection between admin client to trigger retry logic and refreshing of the 
> ApiVersions. Since this failure should be rare, two round-trips and timeout 
> delays are well compensated by the less engineering work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming closed pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id

2020-11-05 Thread GitBox


dengziming closed pull request #9571:
URL: https://github.com/apache/kafka/pull/9571


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id

2020-11-05 Thread GitBox


dengziming commented on a change in pull request #9571:
URL: https://github.com/apache/kafka/pull/9571#discussion_r518554842



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2261,7 +2261,7 @@ class KafkaController(val config: KafkaConfig,
 val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
 if (brokerEpochOpt.isEmpty) {
   info(s"Ignoring AlterIsr due to unknown broker $brokerId")
-  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  callback.apply(Right(Errors.UNKNOWN_MEMBER_ID))

Review comment:
   Thank you for your reply,  here we have 2 judgement. the first is `if 
(brokerEpochOpt.isEmpty)` and second is 
`if(!brokerEpochOpt.contains(brokerEpoch))`, they both return 
`STALE_BROKER_EPOCH `. maybe the first is meant to return`UNKNOWN_MEMBER_ID `.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id

2020-11-05 Thread GitBox


chia7712 commented on a change in pull request #9571:
URL: https://github.com/apache/kafka/pull/9571#discussion_r518550582



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2261,7 +2261,7 @@ class KafkaController(val config: KafkaConfig,
 val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
 if (brokerEpochOpt.isEmpty) {
   info(s"Ignoring AlterIsr due to unknown broker $brokerId")
-  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  callback.apply(Right(Errors.UNKNOWN_MEMBER_ID))

Review comment:
   not sure whether ```UNKNOWN_MEMBER_ID`` is more suitable than 
```STALE_BROKER_EPOCH```. The concept of member id is not equal to the broker 
id here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id

2020-11-05 Thread GitBox


chia7712 commented on a change in pull request #9571:
URL: https://github.com/apache/kafka/pull/9571#discussion_r518550582



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2261,7 +2261,7 @@ class KafkaController(val config: KafkaConfig,
 val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
 if (brokerEpochOpt.isEmpty) {
   info(s"Ignoring AlterIsr due to unknown broker $brokerId")
-  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  callback.apply(Right(Errors.UNKNOWN_MEMBER_ID))

Review comment:
   not sure whether ```UNKNOWN_MEMBER_ID`` is more suitable than 
```STALE_BROKER_EPOCH```.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-05 Thread GitBox


hachikuji commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r518549809



##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Writable.java
##
@@ -33,6 +35,23 @@
 void writeVarint(int i);
 void writeVarlong(long i);
 
+default void writeApiMessage(

Review comment:
   Fair enough.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id

2020-11-05 Thread GitBox


dengziming commented on pull request #9571:
URL: https://github.com/apache/kafka/pull/9571#issuecomment-722889382


   @hachikuji  hi, PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #9571: KAFKA-10691: AlterIsr Respond with wrong Error Id

2020-11-05 Thread GitBox


dengziming opened a new pull request #9571:
URL: https://github.com/apache/kafka/pull/9571


   AlterIsr send by an unknown broker will respond with a STALE_BROKER_EPOCH, 
which should be UNKNOWN_MEMBER_ID.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10691) AlterIsr Respond with wrong Error Id

2020-11-05 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming updated KAFKA-10691:
---
Summary: AlterIsr Respond with wrong Error Id  (was: AlterIsrR Respond with 
wrong Error Id)

> AlterIsr Respond with wrong Error Id
> 
>
> Key: KAFKA-10691
> URL: https://issues.apache.org/jira/browse/KAFKA-10691
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>
> AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, 
> which should be UNKNOWN_MEMBER_ID. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10691) AlterIsrR Respond with wrong Error Id

2020-11-05 Thread dengziming (Jira)
dengziming created KAFKA-10691:
--

 Summary: AlterIsrR Respond with wrong Error Id
 Key: KAFKA-10691
 URL: https://issues.apache.org/jira/browse/KAFKA-10691
 Project: Kafka
  Issue Type: Sub-task
  Components: controller
Reporter: dengziming
Assignee: dengziming


AlterIsr send by an unknown broker will respond with an STALE_BROKER_EPOCH, 
which should be UNKNOWN_MEMBER_ID. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9837) New RPC for notifying controller of failed replica

2020-11-05 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming reassigned KAFKA-9837:
-

Assignee: dengziming

> New RPC for notifying controller of failed replica
> --
>
> Key: KAFKA-9837
> URL: https://issues.apache.org/jira/browse/KAFKA-9837
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Reporter: David Arthur
>Assignee: dengziming
>Priority: Major
> Fix For: 2.8.0
>
>
> This is the tracking ticket for 
> [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
>  For the bridge release, brokers should no longer use ZooKeeper to notify the 
> controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9837) New RPC for notifying controller of failed replica

2020-11-05 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227182#comment-17227182
 ] 

dengziming commented on KAFKA-9837:
---

I will do this.

> New RPC for notifying controller of failed replica
> --
>
> Key: KAFKA-9837
> URL: https://issues.apache.org/jira/browse/KAFKA-9837
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Reporter: David Arthur
>Priority: Major
> Fix For: 2.8.0
>
>
> This is the tracking ticket for 
> [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
>  For the bridge release, brokers should no longer use ZooKeeper to notify the 
> controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

2020-11-05 Thread GitBox


mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518540480



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -997,7 +1016,7 @@ void shutdown(final boolean clean) {
 
 // For testing only.
 int commitAll() {
-return commit(tasks.values());
+return commit(new HashSet<>(tasks.values()));

Review comment:
   Need to make a copy (as all production calls do, too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

2020-11-05 Thread GitBox


mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518540292



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -903,6 +913,15 @@ void shutdown(final boolean clean) {
 tasksToCloseClean.remove(task);
 }
 }
+} catch (final TaskTimeoutExceptions taskTimeoutExceptions) {

Review comment:
   During shutdown, we don't need to trigger `task.timeout.ms` but can 
re-throw the `TimeoutException` to trigger a "dirty close" instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

2020-11-05 Thread GitBox


mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518540199



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -570,6 +575,11 @@ void handleRevocation(final Collection 
revokedPartitions) {
 // so we would capture any exception and throw
 try {
 commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskTimeoutExceptions taskTimeoutExceptions) {

Review comment:
   If a task is revoked, we don't need to trigger `task.timeout.ms` but can 
re-throw the `TimeoutException` to trigger a "dirty close" instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

2020-11-05 Thread GitBox


mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518539800



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##
@@ -202,6 +204,10 @@ private void recordSendError(final String topic, final 
Exception exception, fina
 "indicating the task may be migrated out";
 sendException.set(new TaskMigratedException(errorMessage, 
exception));
 } else {
+// TODO: KIP-572 handle `TimeoutException extends 
RetriableException`

Review comment:
   As above: This is an open question. Input is welcome. I would like to 
tackle it in a follow up PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

2020-11-05 Thread GitBox


mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518539735



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##
@@ -111,6 +111,8 @@ public void initialize() {
 try {
 partitions = streamsProducer.partitionsFor(topic);
 } catch (final KafkaException e) {
+// TODO: KIP-572 need to handle `TimeoutException`
+// -> should we throw a `TaskCorruptedException` for this case 
to reset the task and retry (including triggering `task.timeout.ms`) ?

Review comment:
   This is an open question. Input is welcome. I would like to tackle it in 
a follow up PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

2020-11-05 Thread GitBox


mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518539527



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -164,9 +164,9 @@ public InternalTopicManager(final Time time,
 "Error message was: {}", topicName, 
cause.toString());
 throw new StreamsException(String.format("Could 
not create topic %s.", topicName), cause);
 }
-} catch (final TimeoutException retryableException) {
+} catch (final TimeoutException retriableException) {

Review comment:
   Just fixing some naming issue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

2020-11-05 Thread GitBox


mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518539416



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/TaskTimeoutExceptions.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.streams.processor.internals.Task;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class TaskTimeoutExceptions extends StreamsException {

Review comment:
   If we commit multiple tasks individually (ie, eos-alpha), we use this 
class as an "exception container" to track the TimeoutException for each failed 
task individually.
   
   To simplify the caller code, we also wrap a single `TimeoutException` if we 
commit all tasks at once (at-least-once, eos-beta)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #9570: KAFKA-9274: Handle TimeoutException on commit

2020-11-05 Thread GitBox


mjsax opened a new pull request #9570:
URL: https://github.com/apache/kafka/pull/9570


- part of KIP-572
- when KafkaStreams commit a task, a TimeoutException should not kill
  the thread but `task.timeout.ms` should be triggered and the commit
  should be retried in the next loop
   
   Call for review @vvcephei 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-05 Thread GitBox


hachikuji commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r518532228



##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Writable.java
##
@@ -33,6 +35,23 @@
 void writeVarint(int i);
 void writeVarlong(long i);
 
+default void writeApiMessage(
+ApiMessage message,
+ObjectSerializationCache serializationCache,
+short version
+) {
+message.write(this, serializationCache, version);
+}
+
+default void writeRecords(BaseRecords records) {

Review comment:
   I was concerned about this also, but the generated code adds its own 
null check.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-05 Thread GitBox


hachikuji commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r518523253



##
File path: 
generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##
@@ -1581,56 +1570,56 @@ private void generateVariableLengthFieldSize(FieldSpec 
field,
 
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
 buffer.printf("_cache.setArraySizeInBytes(%s, 
_arraySize);%n",
 field.camelCaseName());
-buffer.printf("_size += _arraySize + 
ByteUtils.sizeOfUnsignedVarint(_arraySize);%n");
+
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_arraySize.totalSize()));%n");
+buffer.printf("_size.add(_arraySize);%n");
 } else {
-buffer.printf("_size += _arraySize;%n");
+buffer.printf("_size.add(_arraySize);%n");
 }
 } else if (field.type().isBytes()) {
+buffer.printf("MessageSize _bytesSize = new 
MessageSize();%n");
 if (field.zeroCopy()) {
-buffer.printf("int _bytesSize = %s.remaining();%n", 
field.camelCaseName());
+
buffer.printf("_bytesSize.addZeroCopyBytes(%s.remaining());%n", 
field.camelCaseName());
 } else {
-buffer.printf("int _bytesSize = %s.length;%n", 
field.camelCaseName());
+buffer.printf("_bytesSize.addBytes(%s.length);%n", 
field.camelCaseName());
 }
 
VersionConditional.forVersions(fieldFlexibleVersions(field), possibleVersions).
 ifMember(__ -> {
 
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
 if (field.zeroCopy()) {
-buffer.printf("_bytesSize += " +
-
"ByteUtils.sizeOfUnsignedVarint(%s.remaining() + 1);%n", field.camelCaseName());
+buffer.printf("_bytesSize.addBytes(" +
+
"ByteUtils.sizeOfUnsignedVarint(%s.remaining() + 1));%n", 
field.camelCaseName());
 } else {
-buffer.printf("_bytesSize += 
ByteUtils.sizeOfUnsignedVarint(%s.length + 1);%n",
+
buffer.printf("_bytesSize.addBytes(ByteUtils.sizeOfUnsignedVarint(%s.length + 
1));%n",
 field.camelCaseName());
 }
 }).
 ifNotMember(__ -> {
-buffer.printf("_bytesSize += 4;%n");
+buffer.printf("_bytesSize.addBytes(4);%n");
 }).
 generate(buffer);
 if (tagged) {
 
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
-buffer.printf("_size += _bytesSize + 
ByteUtils.sizeOfUnsignedVarint(_bytesSize);%n");
-} else {
-buffer.printf("_size += _bytesSize;%n");
+
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_bytesSize.totalSize()));%n");
 }
+buffer.printf("_size.add(_bytesSize);%n");
 } else if (field.type().isRecords()) {
-buffer.printf("int _recordsSize = %s.sizeInBytes();%n", 
field.camelCaseName());
+buffer.printf("_size.addBytes(%s.sizeInBytes());%n", 
field.camelCaseName());

Review comment:
   Yeah, my bad. Probably messed this up after renaming.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10604) The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings

2020-11-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227164#comment-17227164
 ] 

Matthias J. Sax commented on KAFKA-10604:
-

Thanks. Good to see that we are in agreement. Will review the PR soon. Thanks 
for the ticket and PR [~dongjin]!

> The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM 
> parameter or OS-specific settings
> -
>
> Key: KAFKA-10604
> URL: https://issues.apache.org/jira/browse/KAFKA-10604
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
>
> I found this problem working for 
> [KAFKA-10585|https://issues.apache.org/jira/browse/KAFKA-10585].
> The JVM's temporary directory location is different per OS, and JVM allows to 
> change it with `java.io.tmpdir` system property. In Linux, it defaults to 
> `/tmp`.
> The problem is the default value of StreamsConfig.STATE_DIR_CONFIG 
> (`state.dir`) is fixed to `/tmp/kafka-streams`. For this reason, it does not 
> change if the runs on OS other than Linux or the user specifies 
> `java.io.tmpdir` system property.
> It should be `\{temp-directory}/kafka-streams`, not `/tmp/kafka-streams`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-05 Thread GitBox


chia7712 commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r518513351



##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Writable.java
##
@@ -33,6 +35,23 @@
 void writeVarint(int i);
 void writeVarlong(long i);
 
+default void writeApiMessage(

Review comment:
   It is used by only ```SendBuilder```. How about moving it to 
```SendBuilder```? 

##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Writable.java
##
@@ -33,6 +35,23 @@
 void writeVarint(int i);
 void writeVarlong(long i);
 
+default void writeApiMessage(
+ApiMessage message,
+ObjectSerializationCache serializationCache,
+short version
+) {
+message.write(this, serializationCache, version);
+}
+
+default void writeRecords(BaseRecords records) {

Review comment:
   Does it need null check (maybe no-op)?

##
File path: 
generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##
@@ -1581,56 +1570,56 @@ private void generateVariableLengthFieldSize(FieldSpec 
field,
 
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
 buffer.printf("_cache.setArraySizeInBytes(%s, 
_arraySize);%n",
 field.camelCaseName());
-buffer.printf("_size += _arraySize + 
ByteUtils.sizeOfUnsignedVarint(_arraySize);%n");
+
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_arraySize.totalSize()));%n");
+buffer.printf("_size.add(_arraySize);%n");
 } else {
-buffer.printf("_size += _arraySize;%n");
+buffer.printf("_size.add(_arraySize);%n");
 }
 } else if (field.type().isBytes()) {
+buffer.printf("MessageSize _bytesSize = new 
MessageSize();%n");
 if (field.zeroCopy()) {
-buffer.printf("int _bytesSize = %s.remaining();%n", 
field.camelCaseName());
+
buffer.printf("_bytesSize.addZeroCopyBytes(%s.remaining());%n", 
field.camelCaseName());
 } else {
-buffer.printf("int _bytesSize = %s.length;%n", 
field.camelCaseName());
+buffer.printf("_bytesSize.addBytes(%s.length);%n", 
field.camelCaseName());
 }
 
VersionConditional.forVersions(fieldFlexibleVersions(field), possibleVersions).
 ifMember(__ -> {
 
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
 if (field.zeroCopy()) {
-buffer.printf("_bytesSize += " +
-
"ByteUtils.sizeOfUnsignedVarint(%s.remaining() + 1);%n", field.camelCaseName());
+buffer.printf("_bytesSize.addBytes(" +
+
"ByteUtils.sizeOfUnsignedVarint(%s.remaining() + 1));%n", 
field.camelCaseName());
 } else {
-buffer.printf("_bytesSize += 
ByteUtils.sizeOfUnsignedVarint(%s.length + 1);%n",
+
buffer.printf("_bytesSize.addBytes(ByteUtils.sizeOfUnsignedVarint(%s.length + 
1));%n",
 field.camelCaseName());
 }
 }).
 ifNotMember(__ -> {
-buffer.printf("_bytesSize += 4;%n");
+buffer.printf("_bytesSize.addBytes(4);%n");
 }).
 generate(buffer);
 if (tagged) {
 
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
-buffer.printf("_size += _bytesSize + 
ByteUtils.sizeOfUnsignedVarint(_bytesSize);%n");
-} else {
-buffer.printf("_size += _bytesSize;%n");
+
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_bytesSize.totalSize()));%n");
 }
+buffer.printf("_size.add(_bytesSize);%n");
 } else if (field.type().isRecords()) {
-buffer.printf("int _recordsSize = %s.sizeInBytes();%n", 
field.camelCaseName());
+buffer.printf("_size.addBytes(%s.sizeInBytes());%n", 
field.camelCaseName());

Review comment:
   Why it is not ```addZeroCopyBytes```?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact

[GitHub] [kafka] vamossagar12 commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage

2020-11-05 Thread GitBox


vamossagar12 commented on a change in pull request #9539:
URL: https://github.com/apache/kafka/pull/9539#discussion_r518513246



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -317,6 +317,9 @@ private void appendLeaderChangeMessage(LeaderState state, 
long currentTimeMs) {
 .map(follower -> new Voter().setVoterId(follower))
 .collect(Collectors.toList());
 
+// Adding the leader to the voters as the protocol ensures that leader 
always votes for itself.
+voters.add(new Voter().setVoterId(state.election().leaderId()));

Review comment:
   @hachikuji i have made the changes you had suggested. 
   cc @jsancio 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

2020-11-05 Thread GitBox


abbccdda commented on pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#issuecomment-722795850


   @edenhill FYI



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

2020-11-05 Thread GitBox


abbccdda opened a new pull request #9569:
URL: https://github.com/apache/kafka/pull/9569


   In KIP-588, we added the new `PRODUCER_FENCED` error code 90 to represent 
existing ProducerFencedException, which would be unknown error for producer 
client who sends ProduceRequest and doesn't know how to handle it. The fix is 
to revise the error code back to the known `INVALID_PRODUCER_EPOCH` in the 
ProduceResponse.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10604) The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings

2020-11-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227150#comment-17227150
 ] 

John Roesler commented on KAFKA-10604:
--

I agree; I do not think this needs a KIP. It seems clear that the intent was to 
use "the temporary directory", and it was simply a bug/oversight to hard-code 
"/tmp" instead of using the platform-indendent "java.io.tmpdir".

In fact, I've just reclassified this as a bug.

> The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM 
> parameter or OS-specific settings
> -
>
> Key: KAFKA-10604
> URL: https://issues.apache.org/jira/browse/KAFKA-10604
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
>
> I found this problem working for 
> [KAFKA-10585|https://issues.apache.org/jira/browse/KAFKA-10585].
> The JVM's temporary directory location is different per OS, and JVM allows to 
> change it with `java.io.tmpdir` system property. In Linux, it defaults to 
> `/tmp`.
> The problem is the default value of StreamsConfig.STATE_DIR_CONFIG 
> (`state.dir`) is fixed to `/tmp/kafka-streams`. For this reason, it does not 
> change if the runs on OS other than Linux or the user specifies 
> `java.io.tmpdir` system property.
> It should be `\{temp-directory}/kafka-streams`, not `/tmp/kafka-streams`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10604) The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings

2020-11-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10604:
-
Issue Type: Bug  (was: Improvement)

> The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM 
> parameter or OS-specific settings
> -
>
> Key: KAFKA-10604
> URL: https://issues.apache.org/jira/browse/KAFKA-10604
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
>
> I found this problem working for 
> [KAFKA-10585|https://issues.apache.org/jira/browse/KAFKA-10585].
> The JVM's temporary directory location is different per OS, and JVM allows to 
> change it with `java.io.tmpdir` system property. In Linux, it defaults to 
> `/tmp`.
> The problem is the default value of StreamsConfig.STATE_DIR_CONFIG 
> (`state.dir`) is fixed to `/tmp/kafka-streams`. For this reason, it does not 
> change if the runs on OS other than Linux or the user specifies 
> `java.io.tmpdir` system property.
> It should be `\{temp-directory}/kafka-streams`, not `/tmp/kafka-streams`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one

2020-11-05 Thread Haruki Okada (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haruki Okada updated KAFKA-10690:
-
Summary: Produce-response delay caused by lagging replica fetch which 
affects in-sync one  (was: Produce-response delay caused by lagging replica 
fetch which blocks in-sync one)

> Produce-response delay caused by lagging replica fetch which affects in-sync 
> one
> 
>
> Key: KAFKA-10690
> URL: https://issues.apache.org/jira/browse/KAFKA-10690
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Haruki Okada
>Priority: Major
> Attachments: image-2020-11-06-11-15-21-781.png, 
> image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png
>
>
> h2. Our environment
>  * Kafka version: 2.4.1
> h2. Phenomenon
>  * Produce response time 99th (remote scope) degrades to 500ms, which is 20 
> times worse than usual
>  ** Meanwhile, the cluster was running replica reassignment to service-in new 
> machine to recover replicas which held by failed (Hardware issue) broker 
> machine
> !image-2020-11-06-11-15-21-781.png|width=292,height=166!
> h2. Analysis
> Let's say
>  * broker-X: The broker we observed produce latency degradation
>  * broker-Y: The broker under servicing-in
> broker-Y was catching up replicas of partitions:
>  * partition-A: has relatively small log size
>  * partition-B: has large log size
> (actually, broker-Y was catching-up many other partitions. I noted only two 
> partitions here to make explanation simple)
> broker-X was the leader for both partition-A and partition-B.
> We found that both partition-A and partition-B are assigned to same 
> ReplicaFetcherThread of broker-Y, and produce latency started to degrade 
> right after broker-Y finished catching up partition-A.
> !image-2020-11-06-11-17-09-910.png|width=476,height=174!
> Besides, we observed disk reads on broker-X during service-in. (This is 
> natural since old segments are likely not in page cache)
> !image-2020-11-06-11-15-38-390.png|width=292,height=193!
> So we suspected that:
>  * In-sync replica fetch (partition-A) was involved by lagging replica fetch 
> (partition-B), which should be slow because it causes actual disk reads
>  ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next 
> fetch request can't be sent until one fetch request completes
>  ** => Causes in-sync replica fetch for partitions assigned to same replica 
> fetcher thread to delay
>  ** => Causes remote scope produce latency degradation
> h2. Possible fix
> We think this issue can be addressed by designating part of 
> ReplicaFetcherThread (or creating another thread pool) for lagging replica 
> catching-up, but not so sure this is the appropriate way.
> Please give your opinions about this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman closed pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG

2020-11-05 Thread GitBox


ableegoldman closed pull request #9489:
URL: https://github.com/apache/kafka/pull/9489


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman removed a comment on pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG

2020-11-05 Thread GitBox


ableegoldman removed a comment on pull request #9489:
URL: https://github.com/apache/kafka/pull/9489#issuecomment-715639072


   @guozhangwang @vvcephei  WDYT? I'm generally all for more logs but this is 
pretty extreme 🙂 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-05 Thread GitBox


chia7712 commented on pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#issuecomment-722781536


   > Did that fix the issue since I still saw the same WARN when running 
kafka-topics.sh.
   
   my bad :(



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518478117



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -567,10 +589,34 @@ void runLoop() {
 }
 } catch (final TaskMigratedException e) {
 handleTaskMigrated(e);
+} catch (final Exception e) {
+if (this.streamsUncaughtExceptionHandler.handle(e)) {

Review comment:
   I had a little trouble following the `Handler` class. Some trivial 
things -- eg the handler in the StreamThread is named 
`streamsUncaughtExceptionHandler` but it's actually _not_ a 
`StreamsUncaughtExceptionHandler`. Also the usage of the return value; IIUC 
it's supposed to indicate whether to use the new handler or fall back on the 
old one. To me it sounds like if `handle` returns `true` that means we should 
handle it, ie we should _not_ rethrow the exception, but this looks like the 
opposite of what we do now. Honestly either interpretation is ok with me, as 
long as it's documented somewhere
   
   Do we really need the `Handler` in the first place though? It's already 
pretty confusing that we have to deal with two types of handlers (old and new) 
so I'd prefer not to add a third unless it's really necessary. It seems like we 
can just inline the logic of whether to invoke the new handler or rethrow the 
exception, which would also clear up the confusion around the meaning of the 
return value. But I might be missing something here -- WDYT?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -282,6 +283,17 @@ public boolean isRunning() {
 private final Admin adminClient;
 private final InternalTopologyBuilder builder;
 
+private Handler streamsUncaughtExceptionHandler;
+private ShutdownErrorHook shutdownErrorHook;
+private AtomicInteger assignmentErrorCode;
+public interface ShutdownErrorHook {
+void shutdown();
+}

Review comment:
   Seems like we can just pass in a Runnable with 
`KafkaStreams::closeToError` instead of adding a whole `ShutdownErrorHook` 
functional interface

##
File path: streams/src/main/resources/common/message/SubscriptionInfoData.json
##
@@ -57,6 +57,11 @@
   "name": "uniqueField",
   "versions": "8+",
   "type": "int8"
+},
+{
+  "name": "shutdownRequested",
+  "versions": "9+",
+  "type": "int8"

Review comment:
   I think we should mirror the `errorCode` in the AssignmentInfo here, 
both in terms of naming and type. If we're going to use the same AssignorError 
for both, then they should really be the same. And we may want to send other 
kinds of error codes in the subscription going forward: better to just encode a 
single `int` than a separate `byte` for every logical error code. I don't think 
we'll notice the extra three bytes since Subscriptions aren't sent that 
frequently

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java
##
@@ -30,7 +30,7 @@
 public Admin adminClient;
 public TaskManager taskManager;
 public StreamsMetadataState streamsMetadataState;
-public final AtomicInteger assignmentErrorCode = new AtomicInteger();
+public AtomicInteger assignmentErrorCode = new AtomicInteger();

Review comment:
   This should probably stay `final` so we don't accidentally change it ever

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) {
 }
 }
 
+private void closeToError() {
+if (!setState(State.ERROR)) {
+log.info("Can not transition to error from state " + state());

Review comment:
   Should this be logged at error?

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -996,6 +1082,62 @@ private boolean close(final long timeoutMs) {
 }
 }
 
+private void closeToError() {
+if (!setState(State.ERROR)) {
+log.info("Can not transition to error from state " + state());
+} else {
+log.info("Transitioning to ERROR state");
+stateDirCleaner.shutdownNow();
+if (rocksDBMetricsRecordingService != null) {
+rocksDBMetricsRecordingService.shutdownNow();
+}
+
+// wait for all threads to join in a separate thread;
+// save the current thread so that if it is a stream thread
+// we don't attempt to join it and cause a deadlock
+final Thread shutdownThread = new Thread(() -> {
+// notify all the threads to stop; avoid deadlocks by stopping 
any
+// further state reports from the thread since we're shutting 

[jira] [Created] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which blocks in-sync one

2020-11-05 Thread Haruki Okada (Jira)
Haruki Okada created KAFKA-10690:


 Summary: Produce-response delay caused by lagging replica fetch 
which blocks in-sync one
 Key: KAFKA-10690
 URL: https://issues.apache.org/jira/browse/KAFKA-10690
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.4.1
Reporter: Haruki Okada
 Attachments: image-2020-11-06-11-15-21-781.png, 
image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png

h2. Our environment
 * Kafka version: 2.4.1

h2. Phenomenon
 * Produce response time 99th (remote scope) degrades to 500ms, which is 20 
times worse than usual
 ** Meanwhile, the cluster was running replica reassignment to service-in new 
machine to recover replicas which held by failed (Hardware issue) broker machine

!image-2020-11-06-11-15-21-781.png|width=292,height=166!
h2. Analysis

Let's say
 * broker-X: The broker we observed produce latency degradation
 * broker-Y: The broker under servicing-in

broker-Y was catching up replicas of partitions:
 * partition-A: has relatively small log size
 * partition-B: has large log size

(actually, broker-Y was catching-up many other partitions. I noted only two 
partitions here to make explanation simple)

broker-X was the leader for both partition-A and partition-B.

We found that both partition-A and partition-B are assigned to same 
ReplicaFetcherThread of broker-Y, and produce latency started to degrade right 
after broker-Y finished catching up partition-A.

!image-2020-11-06-11-17-09-910.png|width=476,height=174!

Besides, we observed disk reads on broker-X during service-in. (This is natural 
since old segments are likely not in page cache)

!image-2020-11-06-11-15-38-390.png|width=292,height=193!

So we suspected that:
 * In-sync replica fetch (partition-A) was involved by lagging replica fetch 
(partition-B), which should be slow because it causes actual disk reads
 ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next 
fetch request can't be sent until one fetch request completes
 ** => Causes in-sync replica fetch for partitions assigned to same replica 
fetcher thread to delay
 ** => Causes remote scope produce latency degradation

h2. Possible fix

We think this issue can be addressed by designating part of 
ReplicaFetcherThread (or creating another thread pool) for lagging replica 
catching-up, but not so sure this is the appropriate way.

Please give your opinions about this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman opened a new pull request #9568: KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops

2020-11-05 Thread GitBox


ableegoldman opened a new pull request #9568:
URL: https://github.com/apache/kafka/pull/9568


   Two pieces:
   1) Fix bug in StreamSinkNode#writeToTopology to make sure it always calls 
the `addSink` overload with the specific topic name, when it exists, so that 
this topic gets tracked in the InternalTopologyBuilder's `nodeToSinkTopic` map. 
The sink topics are used by the StreamsPartitionAssignor to resolve the 
upstream subtopology of a repartition source topic, for whom that repartition 
topic will be a sink. Without this information the SPA gets stuck permanently 
during a rebalance
   2) Improve the SPA's `setRepartitionTopicMetadataNumberOfPartitions()` 
method to break out of the loop if we aren't making any progress, to avoid 
infinitely looping if we ever have another bug like KAFKA-10689. If the SPA 
hasn't updated the known partition numbers for any repartition topic in the 
current outer loop, then we know that it's stuck and should throw a 
TaskAssignmentException to shut down the application



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-05 Thread GitBox


junrao commented on pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#issuecomment-722728533


   @chia7712  : Thanks for the updated PR. Did that fix the issue since I still 
saw the same WARN when running kafka-topics.sh.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10624) [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration

2020-11-05 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-10624.
-
Fix Version/s: 2.8.0
   Resolution: Fixed

merged the PR to trunk.

> [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration
> 
>
> Key: KAFKA-10624
> URL: https://issues.apache.org/jira/browse/KAFKA-10624
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Minor
> Fix For: 2.8.0
>
>
> In Scala, we prefer sealed traits over Enumeration since the former gives you 
> exhaustiveness checking. With Scala Enumeration, you don't get a warning if 
> you add a new value that is not handled in a given pattern match.
> This Jira tracks refactoring enum 
> [FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801]
>  from an enum to a sealed trait. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10604) The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings

2020-11-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227084#comment-17227084
 ] 

Guozhang Wang commented on KAFKA-10604:
---

I personally would prefer not a KIP for this straight-forward fix.

> The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM 
> parameter or OS-specific settings
> -
>
> Key: KAFKA-10604
> URL: https://issues.apache.org/jira/browse/KAFKA-10604
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
>
> I found this problem working for 
> [KAFKA-10585|https://issues.apache.org/jira/browse/KAFKA-10585].
> The JVM's temporary directory location is different per OS, and JVM allows to 
> change it with `java.io.tmpdir` system property. In Linux, it defaults to 
> `/tmp`.
> The problem is the default value of StreamsConfig.STATE_DIR_CONFIG 
> (`state.dir`) is fixed to `/tmp/kafka-streams`. For this reason, it does not 
> change if the runs on OS other than Linux or the user specifies 
> `java.io.tmpdir` system property.
> It should be `\{temp-directory}/kafka-streams`, not `/tmp/kafka-streams`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration

2020-11-05 Thread GitBox


junrao merged pull request #9561:
URL: https://github.com/apache/kafka/pull/9561


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure

2020-11-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227080#comment-17227080
 ] 

Guozhang Wang edited comment on KAFKA-10688 at 11/5/20, 11:50 PM:
--

Without KAFKA-3370, then we have to implement the desired behavior at the 
streams layer itself. That is:

1) Upon task assignment, explicitly set the starting offset for the main 
consumer based on the per-topic / global reset policy. For repartition topics, 
the reset policy would be using the global policy.

2) Upon task revive (for corrupted exception handling), do the same thing as 1).

3) During normal processing, if an InvalidOffsetException is thrown from main 
consumer, we differentiate these cases: 3.a) for source topics: log a warning 
and reset accordingly; 3.b) for repartition topics throw as fatal errors.

We can potentially be more strict that we require all topics contains committed 
offset, if only some of them have committed positions then fail. But for 
extensibility I'm going to hold on doing that for now.


was (Author: guozhang):
Without KAFKA-3370, then we have to implement the desired behavior at the 
streams layer itself. That is:

1) Upon task assignment, explicitly set the starting offset for the main 
consumer based on the per-topic / global reset policy. For repartition topics, 
the reset policy would be `latest`.

2) Upon task revive (for corrupted exception handling), do the same thing as 1).

3) During normal processing, if an InvalidOffsetException is thrown from main 
consumer, we differentiate these cases: 3.a) for source topics: log a warning 
and reset accordingly; 3.b) for repartition topics throw as fatal errors.

We can potentially be more strict that we require all topics contains committed 
offset, if only some of them have committed positions then fail. But for 
extensibility I'm going to hold on doing that for now.

> Handle accidental truncation of repartition topics as exceptional failure
> -
>
> Key: KAFKA-10688
> URL: https://issues.apache.org/jira/browse/KAFKA-10688
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Today we always handle InvalidOffsetException from the main consumer by the 
> resetting policy assuming they are for source topics. But repartition topics 
> are also source topics and should never be truncated and hence cause 
> InvalidOffsetException.
> We should differentiate these repartition topics from external source topics 
> and treat the InvalidOffsetException from repartition topics as fatal and 
> close the whole application.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10687) Produce request should be bumped for new error code PRODUCE_FENCED

2020-11-05 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10687:

Issue Type: Bug  (was: Improvement)

> Produce request should be bumped for new error code PRODUCE_FENCED
> --
>
> Key: KAFKA-10687
> URL: https://issues.apache.org/jira/browse/KAFKA-10687
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Blocker
> Fix For: 2.7.0
>
>
> In https://issues.apache.org/jira/browse/KAFKA-9910, we missed a case where 
> the ProduceRequest needs to be bumped to return the new error code 
> PRODUCE_FENCED. This gap needs to be addressed as a blocker since it is 
> shipping in 2.7.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure

2020-11-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227080#comment-17227080
 ] 

Guozhang Wang commented on KAFKA-10688:
---

Without KAFKA-3370, then we have to implement the desired behavior at the 
streams layer itself. That is:

1) Upon task assignment, explicitly set the starting offset for the main 
consumer based on the per-topic / global reset policy. For repartition topics, 
the reset policy would be `latest`.

2) Upon task revive (for corrupted exception handling), do the same thing as 1).

3) During normal processing, if an InvalidOffsetException is thrown from main 
consumer, we differentiate these cases: 3.a) for source topics: log a warning 
and reset accordingly; 3.b) for repartition topics throw as fatal errors.

We can potentially be more strict that we require all topics contains committed 
offset, if only some of them have committed positions then fail. But for 
extensibility I'm going to hold on doing that for now.

> Handle accidental truncation of repartition topics as exceptional failure
> -
>
> Key: KAFKA-10688
> URL: https://issues.apache.org/jira/browse/KAFKA-10688
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Today we always handle InvalidOffsetException from the main consumer by the 
> resetting policy assuming they are for source topics. But repartition topics 
> are also source topics and should never be truncated and hence cause 
> InvalidOffsetException.
> We should differentiate these repartition topics from external source topics 
> and treat the InvalidOffsetException from repartition topics as fatal and 
> close the whole application.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji edited a comment on pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-05 Thread GitBox


hachikuji edited a comment on pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#issuecomment-722690665


   I got inspired to try and extend this to apply to all message types. I've 
updated the patch to remove the custom response logic in `FetchResponse` in 
favor of a general pattern using `SendBuilder`. We had an existing benchmark 
`FetchResponseBenchmark.testSerializeFetchResponse`, so I tried it out.
   
   Here is the results before the patch (10 topics and 500 topics):
   ```
   Result 
"org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse":
 6322.861 ±(99.9%) 310.785 ns/op [Average]
 (min, avg, max) = (6057.758, 6322.861, 7090.658), stdev = 290.708
 CI (99.9%): [6012.076, 6633.646] (assumes normal distribution)
   
   Result 
"org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse":
 323310.283 ±(99.9%) 25947.515 ns/op [Average]
 (min, avg, max) = (301370.273, 323310.283, 383716.556), stdev = 24271.322
 CI (99.9%): [297362.768, 349257.799] (assumes normal distribution)
   ```
   Here is the new benchmark (10 topics and 500 topics):
   ```
   Result 
"org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse":
 5701.378 ±(99.9%) 100.848 ns/op [Average]
 (min, avg, max) = (5601.838, 5701.378, 5925.943), stdev = 94.333
 CI (99.9%): [5600.530, 5802.225] (assumes normal distribution)
   
   Result 
"org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse":
 298221.825 ±(99.9%) 8173.945 ns/op [Average]
 (min, avg, max) = (287615.891, 298221.825, 321499.618), stdev = 7645.913
 CI (99.9%): [290047.880, 306395.770] (assumes normal distribution)
   ```
   So looks like a modest overall improvement.
   
   Note I still need to polish up a few things in the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-05 Thread GitBox


hachikuji commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r518433135



##
File path: clients/src/main/resources/common/message/EnvelopeRequest.json
##
@@ -23,7 +23,7 @@
   "fields": [
 { "name": "RequestData", "type": "bytes", "versions": "0+", "zeroCopy": 
true,
   "about": "The embedded request header and data."},
-{ "name": "RequestPrincipal", "type": "bytes", "versions": "0+", 
"zeroCopy": true, "nullableVersions": "0+",
+{ "name": "RequestPrincipal", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+",

Review comment:
   I agree it should not be nullable. Will fix it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10689) Assignor can't determine number of partitions on FJK with upstream windowed repartition

2020-11-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227077#comment-17227077
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10689:


It's a pretty obnoxious bug, since the application stays stuck in REBALANCING 
while StreamThreads slowly drop out of the group one-by-one as the current 
group leader gets stuck and a new rebalance has to be triggered. Meanwhile we 
don't log anything within this loop so it's impossible to know what happened 
based on the logs.

Ideally we would just limit the number of iterations and shut down the 
application if we can't seem to figure out the number of partitions for some 
reason. Unfortunately, given the random way that 
setRepartitionTopicMetadataNumberOfPartitions walks through the topology and 
the lack of a ceiling on topological cycles/complexity, it's not immediately 
obvious how (or if) we can pick a limit on the number of necessary iterations. 

Still, we can probably improve the current situation and do better than just 
silently looping forever. One simple option would be to just start logging a 
warning once we're past some large iteration number.

Another option is to keep track of the set of repartition topics whose 
partitions are still unknown, and if this set fails to change over one full 
iteration of the outer `topicGroups.values()` loop, then break out and shut 
down the application. This seems pretty airtight, although obviously a bit more 
complicated than just logging a warning at high iteration count. The logging is 
probably more than sufficient for a user to debug their application, but also a 
worse user experience.

> Assignor can't determine number of partitions on FJK with upstream windowed 
> repartition
> ---
>
> Key: KAFKA-10689
> URL: https://issues.apache.org/jira/browse/KAFKA-10689
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.8.0, 2.7.1
>
>
> Due to a minor logical gap in how windowed repartition sink nodes are written 
> to the topology, they are never added to the official map of sink topics 
> tracked by the InternalTopologyBuilder. This makes it impossible to determine 
> the number of partitions of downstream repartition topics in 
> StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions, 
> causing the assignor to loop infinitely in this method. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10689) Assignor can't determine number of partitions on FJK with upstream windowed repartition

2020-11-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-10689:
--

Assignee: A. Sophie Blee-Goldman

> Assignor can't determine number of partitions on FJK with upstream windowed 
> repartition
> ---
>
> Key: KAFKA-10689
> URL: https://issues.apache.org/jira/browse/KAFKA-10689
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.8.0, 2.7.1
>
>
> Due to a minor logical gap in how windowed repartition sink nodes are written 
> to the topology, they are never added to the official map of sink topics 
> tracked by the InternalTopologyBuilder. This makes it impossible to determine 
> the number of partitions of downstream repartition topics in 
> StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions, 
> causing the assignor to loop infinitely in this method. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10689) Assignor can't determine number of partitions on FJK with upstream windowed repartition

2020-11-05 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10689:
---
Priority: Critical  (was: Major)

> Assignor can't determine number of partitions on FJK with upstream windowed 
> repartition
> ---
>
> Key: KAFKA-10689
> URL: https://issues.apache.org/jira/browse/KAFKA-10689
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Critical
> Fix For: 2.8.0, 2.7.1
>
>
> Due to a minor logical gap in how windowed repartition sink nodes are written 
> to the topology, they are never added to the official map of sink topics 
> tracked by the InternalTopologyBuilder. This makes it impossible to determine 
> the number of partitions of downstream repartition topics in 
> StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions, 
> causing the assignor to loop infinitely in this method. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10689) Assignor can't determine number of partitions on FJK with upstream windowed repartition

2020-11-05 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-10689:
--

 Summary: Assignor can't determine number of partitions on FJK with 
upstream windowed repartition
 Key: KAFKA-10689
 URL: https://issues.apache.org/jira/browse/KAFKA-10689
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.5.0
Reporter: A. Sophie Blee-Goldman
 Fix For: 2.8.0, 2.7.1


Due to a minor logical gap in how windowed repartition sink nodes are written 
to the topology, they are never added to the official map of sink topics 
tracked by the InternalTopologyBuilder. This makes it impossible to determine 
the number of partitions of downstream repartition topics in 
StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions, causing 
the assignor to loop infinitely in this method. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on a change in pull request #9565: MINOR: Move upgraded docs from site to Kafka docs

2020-11-05 Thread GitBox


bbejeck commented on a change in pull request #9565:
URL: https://github.com/apache/kafka/pull/9565#discussion_r518417041



##
File path: docs/js/templateData.js
##
@@ -19,6 +19,6 @@ limitations under the License.
 var context={
 "version": "26",
 "dotVersion": "2.6",
-"fullDotVersion": "2.6.1",
+"fullDotVersion": "2.6.0",

Review comment:
   this needs to be `2.6.1`

##
File path: docs/quickstart-zookeeper.html
##
@@ -0,0 +1,277 @@
+
+
+
+  
+
+
+

[jira] [Created] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure

2020-11-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10688:
-

 Summary: Handle accidental truncation of repartition topics as 
exceptional failure
 Key: KAFKA-10688
 URL: https://issues.apache.org/jira/browse/KAFKA-10688
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang
Assignee: Guozhang Wang


Today we always handle InvalidOffsetException from the main consumer by the 
resetting policy assuming they are for source topics. But repartition topics 
are also source topics and should never be truncated and hence cause 
InvalidOffsetException.

We should differentiate these repartition topics from external source topics 
and treat the InvalidOffsetException from repartition topics as fatal and close 
the whole application.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10678) Re-deploying Streams app causes rebalance and task migration

2020-11-05 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227067#comment-17227067
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10678:


Thanks for opening a separate ticket for this. There seem to be two main 
problems/unanswered questions here:

1) Why was there a rebalance at all if static membership was enabled?
2) Why did the rebalance result in a large shuffling of tasks?

For 1) it's difficult to say with only the broker side logs, since they won't 
tell us _why_ the client triggered a new rebalance after it was bounced. Would 
it be possible to collect logs from the client covering the period immediately 
after it was bounced, when it apparently tried to trigger a rebalance?

I was discussing question 2) with [~cadonna] and it seems to be a combination 
of a few things: first, the "eventual" assignment is currently performed 
without regard to the previous placement of tasks. It just tries to distribute 
tasks as evenly as possible, using intermediate assignments and probing 
rebalances as needed. [~vvcephei] wrote up some thoughts on this in 
KAFKA-10121. We're aware of this limitation but haven't addressed it since the 
assignor is deterministic and therefore no-op group changes – such as an 
existing member being bounced – shouldn't result in a different eventual 
assignment than the stable one pre-bounce.

Unfortunately this assignment identifies clients based on the encoded 
processId, which is actually randomly generated during StreamThread startup. So 
the processId identifier would change after a bounce, meaning different initial 
conditions to the assignor function and therefore a different final result :/

I think if the shuffling of tasks wasn't so bad then even if you did still get 
a rebalance even with static membership, then it would hardly be noticeable 
(given that it can continue to actively process during a cooperative 
rebalance). We could probably improve a majority of cases just by fixing the 
processId thing, but I feel like we might as well skip that and just go ahead 
with implementing KAFKA-10121 at that point to improve it for all cases.

> Re-deploying Streams app causes rebalance and task migration
> 
>
> Key: KAFKA-10678
> URL: https://issues.apache.org/jira/browse/KAFKA-10678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.6.1
>Reporter: Bradley Peterson
>Priority: Major
> Attachments: after, before, broker
>
>
> Re-deploying our Streams app causes a rebalance, even when using static group 
> membership. Worse, the rebalance creates standby tasks, even when the 
> previous task assignment was balanced and stable.
> Our app is currently using Streams 2.6.1-SNAPSHOT (due to [KAFKA-10633]) but 
> we saw the same behavior in 2.6.0. The app runs on 4 EC2 instances, each with 
> 4 streams threads, and data stored on persistent EBS volumes.. During a 
> redeploy, all EC2 instances are stopped, new instances are launched, and the 
> EBS volumes are attached to the new instances. We do not use interactive 
> queries. {{session.timeout.ms}} is set to 30 minutes, and the deployment 
> finishes well under that. {{num.standby.replicas}} is 0.
> h2. Expected Behavior
> Given a stable and balanced task assignment prior to deploying, we expect to 
> see the same task assignment after deploying. Even if a rebalance is 
> triggered, we do not expect to see new standby tasks.
> h2. Observed Behavior
> Attached are the "Assigned tasks to clients" log lines from before and after 
> deploying. The "before" is from over 24 hours ago, the task assignment is 
> well balanced and "Finished stable assignment of tasks, no followup 
> rebalances required." is logged. The "after" log lines show the same 
> assignment of active tasks, but some additional standby tasks. There are 
> additional log lines about adding and removing active tasks, which I don't 
> quite understand.
> I've also included logs from the broker showing the rebalance was triggered 
> for "Updating metadata".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-05 Thread GitBox


hachikuji commented on pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#issuecomment-722690665


   I got inspired to try and extend this to apply to all request types. I've 
updated the patch to remove the custom response logic in `FetchResponse` in 
favor of a general pattern using `SendBuilder`. We had an existing benchmark 
`FetchResponseBenchmark.testSerializeFetchResponse`, so I tried it out.
   
   Here is the results before the patch (10 topics and 500 topics):
   ```
   Result 
"org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse":
 6322.861 ±(99.9%) 310.785 ns/op [Average]
 (min, avg, max) = (6057.758, 6322.861, 7090.658), stdev = 290.708
 CI (99.9%): [6012.076, 6633.646] (assumes normal distribution)
   
   Result 
"org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse":
 323310.283 ±(99.9%) 25947.515 ns/op [Average]
 (min, avg, max) = (301370.273, 323310.283, 383716.556), stdev = 24271.322
 CI (99.9%): [297362.768, 349257.799] (assumes normal distribution)
   ```
   Here is the new benchmark (10 topics and 500 topics):
   ```
   Result 
"org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse":
 5701.378 ±(99.9%) 100.848 ns/op [Average]
 (min, avg, max) = (5601.838, 5701.378, 5925.943), stdev = 94.333
 CI (99.9%): [5600.530, 5802.225] (assumes normal distribution)
   
   Result 
"org.apache.kafka.jmh.common.FetchResponseBenchmark.testSerializeFetchResponse":
 298221.825 ±(99.9%) 8173.945 ns/op [Average]
 (min, avg, max) = (287615.891, 298221.825, 321499.618), stdev = 7645.913
 CI (99.9%): [290047.880, 306395.770] (assumes normal distribution)
   ```
   So looks like a modest overall improvement.
   
   Note I still need to polish up a few things in the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram opened a new pull request #9567: MINOR: Always return partitions with diverging epochs in fetch response

2020-11-05 Thread GitBox


rajinisivaram opened a new pull request #9567:
URL: https://github.com/apache/kafka/pull/9567


   This is required to ensure that followers can truncate based on diverging 
epochs returned in fetch responses.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-05 Thread GitBox


hachikuji commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r518414400



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##
@@ -89,10 +89,11 @@ public ResponseHeader toResponseHeader() {
 public static RequestHeader parse(ByteBuffer buffer) {
 short apiKey = -1;
 try {
+int position = buffer.position();

Review comment:
   This was a bug. This logic assumes that the buffer is at position 0. Let 
me add a test case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10687) Produce request should be bumped for new error code PRODUCE_FENCED

2020-11-05 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10687:
---

 Summary: Produce request should be bumped for new error code 
PRODUCE_FENCED
 Key: KAFKA-10687
 URL: https://issues.apache.org/jira/browse/KAFKA-10687
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen
Assignee: Boyang Chen
 Fix For: 2.7.0


In https://issues.apache.org/jira/browse/KAFKA-9910, we missed a case where the 
ProduceRequest needs to be bumped to return the new error code PRODUCE_FENCED. 
This gap needs to be addressed as a blocker since it is shipping in 2.7.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-05 Thread GitBox


abbccdda commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r518396407



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##
@@ -89,10 +89,11 @@ public ResponseHeader toResponseHeader() {
 public static RequestHeader parse(ByteBuffer buffer) {
 short apiKey = -1;
 try {
+int position = buffer.position();

Review comment:
   Is this specifically for Envelope case?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-05 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r518396177



##
File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.snapshot
+
+import java.nio.ByteBuffer
+import java.nio.file.Path
+import java.util.{Iterator => JIterator}
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.snapshot.SnapshotReader
+
+final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: 
OffsetAndEpoch) extends SnapshotReader {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-05 Thread GitBox


dajac commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r518387030



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version)
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version)
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
+  case res: DescribeUserScramCredentialsRequest => 
DescribeUserScramCredentialsRequestDat

[GitHub] [kafka] dajac commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-05 Thread GitBox


dajac commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r518384421



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version, verbose)
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version, verbose)
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, 
request.version, verbose)
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data, 

[jira] [Assigned] (KAFKA-6943) Have option to shutdown KS cleanly if any threads crashes, or if all threads crash

2020-11-05 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson reassigned KAFKA-6943:
-

Assignee: Walker Carlson

> Have option to shutdown KS cleanly if any threads crashes, or if all threads 
> crash
> --
>
> Key: KAFKA-6943
> URL: https://issues.apache.org/jira/browse/KAFKA-6943
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Assignee: Walker Carlson
>Priority: Major
>  Labels: user-experience
>
> ATM users have to implement this themselves. Might be nice to have an option 
> to configure that if all threads crash, or if any crash, to initiate clean 
> shutdown.
> This also has a gotcha where atm if you call KS#close without a timeout, from 
> the uncaught exception handler, you dead lock.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9331) Add option to terminate application when StreamThread(s) die

2020-11-05 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson reassigned KAFKA-9331:
-

Assignee: Walker Carlson

> Add option to terminate application when StreamThread(s) die
> 
>
> Key: KAFKA-9331
> URL: https://issues.apache.org/jira/browse/KAFKA-9331
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Assignee: Walker Carlson
>Priority: Minor
>  Labels: needs-kip
>
> Currently, if a {{StreamThread}} dies due to an unexpected exception, the 
> Streams application continues running. Even if all {{StreamThread}}(s) die, 
> the application will continue running, but will be in an {{ERROR}} state. 
> Many users want or expect the application to terminate in the event of a 
> fatal exception that kills one or more {{StreamThread}}(s). Currently, this 
> requires extra work from the developer to register an uncaught exception 
> handler on the {{KafkaStreams}} object and trigger a shutdown as needed.
> It would be useful to provide a configurable option for the Streams 
> application to have it automatically terminate with an exception if one or 
> more {{StreamThread}}(s) die.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518371722



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class StreamsHandlerIntegrationTest {
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+String inputTopic;
+StreamsBuilder builder;
+Properties properties;
+List processorValueCollector;
+String idempotentTopic = "idempotentTopic";
+String appId = "";
+
+@Before
+public void setup() {
+final String testId = safeUniqueTestName(getClass(), testName);
+appId = "appId_" + testId;
+inputTopic = "input" + testId;
+cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic);
+
+IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);

Review comment:
   good questions





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518371511



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##
@@ -60,6 +60,9 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 }  else if (assignmentErrorCode.get() == 
AssignorError.ASSIGNMENT_ERROR.code()) {
 log.error("Received error code {}", 
AssignorError.ASSIGNMENT_ERROR);
 throw new TaskAssignmentException("Hit an unexpected exception 
during task assignment phase of rebalance");
+} else if (assignmentErrorCode.get() == 
AssignorError.SHUTDOWN_REQUESTED.code()) {

Review comment:
   added unit test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-05 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r518363490



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version)
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version)
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
+  case res: DescribeUserScramCredentialsRequest => 
DescribeUserScramCredentialsReq

[GitHub] [kafka] kowshik commented on pull request #9561: KAFKA-10624: For FeatureZNodeStatus, use sealed trait instead of Enumeration

2020-11-05 Thread GitBox


kowshik commented on pull request #9561:
URL: https://github.com/apache/kafka/pull/9561#issuecomment-722644136


   I've rebased on top of #9559 now. So the build failure(s) should be gone.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10500) Add API to Start and Stop Stream Threads

2020-11-05 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson reassigned KAFKA-10500:
--

Assignee: Walker Carlson

> Add API to Start and Stop Stream Threads
> 
>
> Key: KAFKA-10500
> URL: https://issues.apache.org/jira/browse/KAFKA-10500
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Walker Carlson
>Priority: Major
>  Labels: needs-kip
>
> Currently, there is no possibility in Kafka Streams to increase or decrease 
> the number of stream threads after the Kafka Streams client has been started. 
> Uncaught exceptions thrown in a stream thread kill the stream thread leaving 
> the Kafka Streams client with less stream threads for processing than when 
> the client was started. The only way to replace the killed stream thread is 
> to restart the whole Kafka Streams client. For transient errors, it might 
> make sense to replace a killed stream thread with a new one while users try 
> to find the root cause of the error. That could be accomplished by starting a 
> new stream thread in the uncaught exception handler of the killed stream 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

2020-11-05 Thread GitBox


vvcephei merged pull request #9543:
URL: https://github.com/apache/kafka/pull/9543


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

2020-11-05 Thread GitBox


vvcephei commented on pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#issuecomment-722642034


   The test failures look unrelated:
   ```
   
   Build / JDK 8 / 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync
 | 34 sec | 1
   -- | -- | --
   Build / JDK 11 / 
kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition | 2 
min 3 sec | 1
   Build / JDK 11 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota
 | 59 sec | 1
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9566: KAFKA-10618: Update to Uuid class

2020-11-05 Thread GitBox


jolshan commented on pull request #9566:
URL: https://github.com/apache/kafka/pull/9566#issuecomment-722639934


   @cmccabe @ijuma 
   Let me know if this looks ok.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #9566: KAFKA-10618: Update to Uuid class

2020-11-05 Thread GitBox


jolshan opened a new pull request #9566:
URL: https://github.com/apache/kafka/pull/9566


   As decided in KIP-516, the UUID class has been named Uuid. This PR changes 
all instances of org.apache.kafka.common.UUID to org.apache.kafka.common.Uuid.
   
   It also modifies the Uuid class so that it no longer wraps a java.util.UUID 
object. Now it simply stores two longs. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-05 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r518359188



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version, verbose)
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version, verbose)
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, 
request.version, verbose)
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518335631



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class StreamsHandlerIntegrationTest {
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+String inputTopic;
+StreamsBuilder builder;
+Properties properties;
+List processorValueCollector;
+String idempotentTopic = "idempotentTopic";

Review comment:
   it can be removed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas

2020-11-05 Thread GitBox


apovzner commented on a change in pull request #9555:
URL: https://github.com/apache/kafka/pull/9555#discussion_r518327928



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = 
config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = 
config.maxConnectionsPerIpOverrides.map { case (host, count) => 
(InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = 
config.interBrokerListenerName

Review comment:
   You are right -- I saw that test and thought it was a dynamic config, 
but the test was verifying that it cannot be updated. I see now that KIP-226 
lists that as future work, cool.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-11-05 Thread GitBox


rajinisivaram commented on pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#issuecomment-722609155


   @hachikuji Thanks for the reviews, addressed the comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518326776



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class StreamsHandlerIntegrationTest {
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+String inputTopic;
+StreamsBuilder builder;
+Properties properties;
+List processorValueCollector;
+String idempotentTopic = "idempotentTopic";
+String appId = "";
+
+@Before
+public void setup() {
+final String testId = safeUniqueTestName(getClass(), testName);
+appId = "appId_" + testId;
+inputTopic = "input" + testId;
+cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic);
+
+IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+builder  = new StreamsBuilder();
+
+processorValueCollector = new ArrayList<>();
+
+final KStream stream = builder.stream(inputTopic);
+stream.process(() -> new ShutdownProcessor(processorValueCollector), 
Named.as("process"));
+
+properties  = mkObjectProperties(
+mkMap(
+mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
"5"),
+mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
"6"),
+mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+
mk

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-11-05 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r518326079



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -221,7 +223,15 @@ abstract class AbstractFetcherThread(name: String,
 
   val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
   handlePartitionsWithErrors(partitionsWithError, 
"truncateToEpochEndOffsets")
-  updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
+  updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, 
isTruncationOnFetchSupported)
+}
+  }
+
+  private def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, 
EpochEndOffset]): Unit = {
+inLock(partitionMapLock) {
+  val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty)
+  handlePartitionsWithErrors(partitionsWithError, 
"truncateOnFetchResponse")
+  updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, 
maySkipTruncation = false)

Review comment:
   Fixed, removed that flag.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-1800) KafkaException was not recorded at the per-topic metrics

2020-11-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-1800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226934#comment-17226934
 ] 

Guozhang Wang commented on KAFKA-1800:
--

This has been fixed since 1.0.0; closing now.

> KafkaException was not recorded at the per-topic metrics
> 
>
> Key: KAFKA-1800
> URL: https://issues.apache.org/jira/browse/KAFKA-1800
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Attachments: KAFKA-1800.patch
>
>
> When KafkaException was thrown from producer.send() call, it is not recorded 
> on the per-topic record-error-rate, but only the global error-rate.
> Since users are usually monitoring on the per-topic metrics, loosing all 
> dropped message counts at this level that are caused by kafka producer thrown 
> exceptions such as BufferExhaustedException could be very dangerous.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] splett2 commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas

2020-11-05 Thread GitBox


splett2 commented on a change in pull request #9555:
URL: https://github.com/apache/kafka/pull/9555#discussion_r518283600



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = 
config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = 
config.maxConnectionsPerIpOverrides.map { case (host, count) => 
(InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = 
config.interBrokerListenerName

Review comment:
   `interBrokerListenerName` is apparently not a dynamic config.
   
   see `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate`
   ...
   ```
   // Verify updating inter-broker listener
   val props = new Properties
   props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
   try {
 reconfigureServers(props, perBrokerConfig = true, 
(KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
 fail("Inter-broker listener cannot be dynamically updated")
  }
   ```
   I don't think we allow updating inter broker listener at all, so I think we 
can remove the test I added. I actually wasn't sure if we allowed it or not, 
but the code seems to suggest otherwise.

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = 
config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = 
config.maxConnectionsPerIpOverrides.map { case (host, count) => 
(InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = 
config.interBrokerListenerName

Review comment:
   `interBrokerListenerName` is apparently not a dynamic config.
   
   see `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate`
   ...
   ```
   // Verify updating inter-broker listener
   val props = new Properties
   props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
   try {
 reconfigureServers(props, perBrokerConfig = true, 
(KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
 fail("Inter-broker listener cannot be dynamically updated")
  }
   ```
   I don't think we allow updating inter broker listener at all, so I think we 
can remove the test I added. I actually wasn't sure if we allowed it or not, 
but the code seems to suggest that it isn't.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas

2020-11-05 Thread GitBox


splett2 commented on a change in pull request #9555:
URL: https://github.com/apache/kafka/pull/9555#discussion_r518283600



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = 
config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = 
config.maxConnectionsPerIpOverrides.map { case (host, count) => 
(InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = 
config.interBrokerListenerName

Review comment:
   `interBrokerListenerName` is apparently not a dynamic config.
   
   `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate`
   ...
   ```
   // Verify updating inter-broker listener
   val props = new Properties
   props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
   try {
 reconfigureServers(props, perBrokerConfig = true, 
(KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
 fail("Inter-broker listener cannot be dynamically updated")
  }
   ```
   It does seem like we allow updating the interbroker listener when 
adding/removing listeners, but that case is covered.
   I will try to verify we allow updating the interbroker listener name through 
the inter broker security protocol, since apparently that's another way to 
configure the listener.

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = 
config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = 
config.maxConnectionsPerIpOverrides.map { case (host, count) => 
(InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = 
config.interBrokerListenerName

Review comment:
   `interBrokerListenerName` is apparently not a dynamic config.
   
   see `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate`
   ...
   ```
   // Verify updating inter-broker listener
   val props = new Properties
   props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
   try {
 reconfigureServers(props, perBrokerConfig = true, 
(KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
 fail("Inter-broker listener cannot be dynamically updated")
  }
   ```
   It does seem like we allow updating the interbroker listener when 
adding/removing listeners, but that case is covered.
   I will try to verify we allow updating the interbroker listener name through 
the inter broker security protocol, since apparently that's another way to 
configure the listener.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


cadonna commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518283539



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +374,63 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
handleStreamsUncaughtException(final Throwable e,
+   
  final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(e);
+switch (action) {
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the registered exception handler opted to \" + 
action + \"." +
+" The streams client is going to shut down now. ", e);
+close(Duration.ZERO);

Review comment:
   My last comment is not true! Sorry! Everything alright!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9565: MINOR: Move upgraded docs from site to Kafka docs

2020-11-05 Thread GitBox


bbejeck commented on pull request #9565:
URL: https://github.com/apache/kafka/pull/9565#issuecomment-722569996


   @mimaison ack, I'll take a look



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


cadonna commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518283539



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +374,63 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
handleStreamsUncaughtException(final Throwable e,
+   
  final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(e);
+switch (action) {
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the registered exception handler opted to \" + 
action + \"." +
+" The streams client is going to shut down now. ", e);
+close(Duration.ZERO);

Review comment:
   My last comment is not true! Sorry!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518274359



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class StreamsHandlerIntegrationTest {
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+String inputTopic;
+StreamsBuilder builder;
+Properties properties;
+List processorValueCollector;
+String idempotentTopic = "idempotentTopic";
+String appId = "";
+
+@Before
+public void setup() {
+final String testId = safeUniqueTestName(getClass(), testName);
+appId = "appId_" + testId;
+inputTopic = "input" + testId;
+cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic);
+
+IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+builder  = new StreamsBuilder();
+
+processorValueCollector = new ArrayList<>();
+
+final KStream stream = builder.stream(inputTopic);
+stream.process(() -> new ShutdownProcessor(processorValueCollector), 
Named.as("process"));
+
+properties  = mkObjectProperties(
+mkMap(
+mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
"5"),
+mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
"6"),
+mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+
mk

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518271918



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class StreamsHandlerIntegrationTest {
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+String inputTopic;
+StreamsBuilder builder;
+Properties properties;
+List processorValueCollector;
+String idempotentTopic = "idempotentTopic";
+String appId = "";
+
+@Before
+public void setup() {
+final String testId = safeUniqueTestName(getClass(), testName);
+appId = "appId_" + testId;
+inputTopic = "input" + testId;
+cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic);
+
+IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+builder  = new StreamsBuilder();
+
+processorValueCollector = new ArrayList<>();
+
+final KStream stream = builder.stream(inputTopic);
+stream.process(() -> new ShutdownProcessor(processorValueCollector), 
Named.as("process"));
+
+properties  = mkObjectProperties(
+mkMap(
+mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
"5"),
+mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
"6"),
+mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+
mk

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518271215



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class StreamsHandlerIntegrationTest {
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+String inputTopic;
+StreamsBuilder builder;
+Properties properties;
+List processorValueCollector;
+String idempotentTopic = "idempotentTopic";
+String appId = "";
+
+@Before
+public void setup() {
+final String testId = safeUniqueTestName(getClass(), testName);
+appId = "appId_" + testId;
+inputTopic = "input" + testId;
+cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic);
+
+IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+builder  = new StreamsBuilder();
+
+processorValueCollector = new ArrayList<>();
+
+final KStream stream = builder.stream(inputTopic);
+stream.process(() -> new ShutdownProcessor(processorValueCollector), 
Named.as("process"));
+
+properties  = mkObjectProperties(
+mkMap(
+mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
"5"),
+mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
"6"),
+mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+
mk

[GitHub] [kafka] cadonna commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


cadonna commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518269493



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +374,63 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
handleStreamsUncaughtException(final Throwable e,
+   
  final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(e);
+switch (action) {
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the registered exception handler opted to \" + 
action + \"." +
+" The streams client is going to shut down now. ", e);
+close(Duration.ZERO);

Review comment:
   Why not? It would be much cleaner. We would close all stuff like admin 
client and the metrics, remove the client metrics and set the state to 
NOT_RUNNING which is not necessarily done with timeout zero (probably not 
because of the death lock). Additionally, we would get an nice info debug 
saying `Streams client stopped completely` instead of `Streams client cannot 
stop completely within the timeout`. ;-)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas

2020-11-05 Thread GitBox


apovzner commented on a change in pull request #9555:
URL: https://github.com/apache/kafka/pull/9555#discussion_r518269240



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = 
config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = 
config.maxConnectionsPerIpOverrides.map { case (host, count) => 
(InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = 
config.interBrokerListenerName

Review comment:
   Good point and definitely agree that `config.interBrokerListenerName` 
could be expensive as we call it several times per accepting a new connection. 
   
   The issue here is that interBrokerListenerName is a dynamic config. So, you 
will need to update the cached value on changes to that config; similar how we 
notify ConnectionQuotas about config changes from SocketServer.reconfigure(). 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10686) Pluggable standby tasks assignor for Kafka Streams

2020-11-05 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze reassigned KAFKA-10686:
-

Assignee: Levani Kokhreidze

> Pluggable standby tasks assignor for Kafka Streams
> --
>
> Key: KAFKA-10686
> URL: https://issues.apache.org/jira/browse/KAFKA-10686
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Assignee: Levani Kokhreidze
>Priority: Major
>  Labels: needs-kip
>
> In production, Kafka Streams instances often run across different clusters 
> and availability zones. In order to guarantee high availability of the Kafka 
> Streams deployments, users would need more granular control over which 
> instances standby tasks can be created. 
> Idea of this ticket is to expose interface for Kafka Streams which can be 
> implemented by the users to control where standby tasks can be created.
> Kafka Streams can have RackAware assignment as a default implementation that 
> will take into account `rack.id` of the application and make sure that 
> standby tasks are created on different racks. 
> Point of this ticket though is to give more flexibility to users on standby 
> task creation, in cases where just rack awareness is not enough. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518267956



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class StreamsHandlerIntegrationTest {
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+String inputTopic;
+StreamsBuilder builder;
+Properties properties;
+List processorValueCollector;
+String idempotentTopic = "idempotentTopic";
+String appId = "";
+
+@Before
+public void setup() {
+final String testId = safeUniqueTestName(getClass(), testName);
+appId = "appId_" + testId;
+inputTopic = "input" + testId;
+cleanStateBeforeTest(CLUSTER, idempotentTopic, inputTopic);
+
+IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+builder  = new StreamsBuilder();
+
+processorValueCollector = new ArrayList<>();
+
+final KStream stream = builder.stream(inputTopic);
+stream.process(() -> new ShutdownProcessor(processorValueCollector), 
Named.as("process"));
+
+properties  = mkObjectProperties(
+mkMap(
+mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
"5"),
+mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
"6"),
+mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+
mk

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518265803



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsHandlerIntegrationTest.java
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+public class StreamsHandlerIntegrationTest {

Review comment:
   sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518264927



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,6 +1061,72 @@ private boolean close(final long timeoutMs) {
 }
 }
 
+private void closeToError() {
+if (!setState(State.ERROR)) {
+// if transition failed, it means it was either in PENDING_SHUTDOWN
+// or NOT_RUNNING already; just check that all threads have been 
stopped

Review comment:
   I don't think we actually need it either way so I will just remove it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-05 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r518263937



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -366,6 +374,63 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException if streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
handleStreamsUncaughtException(final Throwable e,
+   
  final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(e);
+switch (action) {
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the registered exception handler opted to \" + 
action + \"." +
+" The streams client is going to shut down now. ", e);
+close(Duration.ZERO);

Review comment:
   It might be but I do not think that it is necessary





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >