[jira] [Created] (KAFKA-10013) Consumer hang-up in case of unclean leader election

2020-05-18 Thread Dmitry (Jira)
Dmitry created KAFKA-10013:
--

 Summary: Consumer hang-up in case of unclean leader election
 Key: KAFKA-10013
 URL: https://issues.apache.org/jira/browse/KAFKA-10013
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.4.1, 2.5.0, 2.3.1, 2.4.0, 2.3.0
Reporter: Dmitry


Starting from kafka 2.3 new offset reset negotiation algorithm added 
(org.apache.kafka.clients.consumer.internals.Fetcher#validateOffsetsAsync)

During this validation, Fetcher 
`org.apache.kafka.clients.consumer.internals.SubscriptionState` is held in 
`AWAIT_VALIDATION` fetch state.

This effectively means that fetch requests are not issued and consumption 
stopped.
In case if unclean leader election is happening during this time, 
`LogTruncationException` is thrown from future listener in method 
`validateOffsetsAsync` (probably in order to turn on the logic defined by 
`auto.offset.reset` parameter).

The main problem is that this exception (thrown from listener of future) is 
effectively swallowed by 
`org.apache.kafka.clients.consumer.internals.AsyncClient#sendAsyncRequest`
by this part of code


} catch (RuntimeException e) {
  if (!future.isDone()) {
    future.raise(e);
  }
}

In the end the result is: The only way to get out of AWAIT_VALIDATION and 
continue consumption is to successfully finish validation, but it can not be 
finished.
However - consumer is alive, but is consuming nothing. The only way to resume 
consumption is to terminate consumer and start another one.

We discovered this situation by means of kstreams application, where valid 
value of `auto.offset.reset` provided by our code is replaced by `None` value 
for a purpose of position reset 
(org.apache.kafka.streams.processor.internals.StreamThread#create).
And with kstreams it is even worse, as application may be working, logging warn 
messages of format `Truncation detected for partition ...,` but data is not 
generated for a long time and in the end is lost, making kstreams application 
unreliable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10014) Always try to close all channels in Selector#close

2020-05-18 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10014:
--

 Summary: Always try to close all channels in Selector#close
 Key: KAFKA-10014
 URL: https://issues.apache.org/jira/browse/KAFKA-10014
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


{code:java}
public void close() {
List connections = new ArrayList<>(channels.keySet());
try {
for (String id : connections)
close(id); // this line
} finally {
{code}

KafkaChannel has a lot of releasable objects so we ought to try to close all 
channels.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8398) NPE when unmapping files after moving log directories using AlterReplicaLogDirs

2020-05-18 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109987#comment-17109987
 ] 

Chia-Ping Tsai commented on KAFKA-8398:
---

seems 
https://github.com/apache/kafka/commit/e554dc518eaaa0747899e708160275f95c4e525f 
had resolved this issue.

{code:scala}
  protected def safeForceUnmap(): Unit = {
try forceUnmap()
catch {
  case t: Throwable => error(s"Error unmapping index $file", t)
}
  }
{code}

Although, it would be better to avoid NPE even if NPE is swallowed.


> NPE when unmapping files after moving log directories using 
> AlterReplicaLogDirs
> ---
>
> Key: KAFKA-8398
> URL: https://issues.apache.org/jira/browse/KAFKA-8398
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0
>Reporter: Vikas Singh
>Priority: Minor
> Attachments: AlterReplicaLogDirs.txt
>
>
> The NPE occurs after the AlterReplicaLogDirs command completes successfully 
> and when unmapping older regions. The relevant part of log is in attached log 
> file. Here is the stacktrace (which is repeated for both index files):
>  
> {code:java}
> [2019-05-20 14:08:13,999] ERROR Error unmapping index 
> /tmp/kafka-logs/test-0.567a0d8ff88b45ab95794020d0b2e66f-delete/.index
>  (kafka.log.OffsetIndex)
> java.lang.NullPointerException
> at 
> org.apache.kafka.common.utils.MappedByteBuffers.unmap(MappedByteBuffers.java:73)
> at kafka.log.AbstractIndex.forceUnmap(AbstractIndex.scala:318)
> at kafka.log.AbstractIndex.safeForceUnmap(AbstractIndex.scala:308)
> at kafka.log.AbstractIndex.$anonfun$closeHandler$1(AbstractIndex.scala:257)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> at kafka.log.AbstractIndex.closeHandler(AbstractIndex.scala:257)
> at kafka.log.AbstractIndex.deleteIfExists(AbstractIndex.scala:226)
> at kafka.log.LogSegment.$anonfun$deleteIfExists$6(LogSegment.scala:597)
> at kafka.log.LogSegment.delete$1(LogSegment.scala:585)
> at kafka.log.LogSegment.$anonfun$deleteIfExists$5(LogSegment.scala:597)
> at kafka.utils.CoreUtils$.$anonfun$tryAll$1(CoreUtils.scala:115)
> at kafka.utils.CoreUtils$.$anonfun$tryAll$1$adapted(CoreUtils.scala:114)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at kafka.utils.CoreUtils$.tryAll(CoreUtils.scala:114)
> at kafka.log.LogSegment.deleteIfExists(LogSegment.scala:599)
> at kafka.log.Log.$anonfun$delete$3(Log.scala:1762)
> at kafka.log.Log.$anonfun$delete$3$adapted(Log.scala:1762)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at kafka.log.Log.$anonfun$delete$2(Log.scala:1762)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at kafka.log.Log.maybeHandleIOException(Log.scala:2013)
> at kafka.log.Log.delete(Log.scala:1759)
> at kafka.log.LogManager.deleteLogs(LogManager.scala:761)
> at kafka.log.LogManager.$anonfun$deleteLogs$6(LogManager.scala:775)
> at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9893) Configurable TCP connection timeout and improve the initial metadata fetch

2020-05-18 Thread Cheng Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9893:
-
Description: 
This issue has two parts:
 # Support transportation layer connection timeout described in KIP-601
 # Optimize the logic for NetworkClient.leastLoadedNode()

Changes:
 # Added a new common client configuration parameter 
socket.connection.setup.timeout.ms to the NetworkClient. Handle potential 
transportation layer timeout using the same approach as it handling potential 
request timeout.
 # When no connected channel exists, leastLoadedNode() will now provide a 
disconnected node that has the least number of failed attempts. 

  was:
This issue has two parts:
 # Support TCP connection timeout described in KIP-601
 # Currently, the LeastLoadedNodeProvider might provide an offline/invalid node 
when no nodes provided in --boostrap-server option is not connected. The 
Cluster class shuffled the nodes to balance the initial pressure (I guess) and 
the LeastLoadedNodeProvider will always provide the same node, which is the 
last node after shuffling. Consequently, though we may provide several 
bootstrap servers, we might hit timeout if any of the servers shutdown.

The implementation strategy for 1 is described in KIP-601

The solution for 2 is to implement a round-robin candidate node selection when 
every node is unconnected. We can either
 # shuffle the nodes every time we hit the "no node connected" status
 # keep the status of the nodes' try times and clean the try times after any of 
the nodes gets connected.

 


> Configurable TCP connection timeout and improve the initial metadata fetch
> --
>
> Key: KAFKA-9893
> URL: https://issues.apache.org/jira/browse/KAFKA-9893
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>
> This issue has two parts:
>  # Support transportation layer connection timeout described in KIP-601
>  # Optimize the logic for NetworkClient.leastLoadedNode()
> Changes:
>  # Added a new common client configuration parameter 
> socket.connection.setup.timeout.ms to the NetworkClient. Handle potential 
> transportation layer timeout using the same approach as it handling potential 
> request timeout.
>  # When no connected channel exists, leastLoadedNode() will now provide a 
> disconnected node that has the least number of failed attempts. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9893) Configurable TCP connection timeout and improve the initial metadata fetch

2020-05-18 Thread Cheng Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9893:
-
Description: 
This issue has two parts:
 # Support transportation layer connection timeout described in KIP-601
 # Optimize the logic for NetworkClient.leastLoadedNode()

Changes:
 # Added a new common client configuration parameter 
socket.connection.setup.timeout.ms to the NetworkClient. Handle potential 
transportation layer timeout using the same approach as it handling potential 
request timeout.
 # When no connected channel exists, leastLoadedNode() will now provide a 
disconnected node that has the least number of failed attempts. 
 # ClusterConnectionStates will keep the connecting node ids. Now it also has 
several new public methods to provide per connection relavant data.

  was:
This issue has two parts:
 # Support transportation layer connection timeout described in KIP-601
 # Optimize the logic for NetworkClient.leastLoadedNode()

Changes:
 # Added a new common client configuration parameter 
socket.connection.setup.timeout.ms to the NetworkClient. Handle potential 
transportation layer timeout using the same approach as it handling potential 
request timeout.
 # When no connected channel exists, leastLoadedNode() will now provide a 
disconnected node that has the least number of failed attempts. 


> Configurable TCP connection timeout and improve the initial metadata fetch
> --
>
> Key: KAFKA-9893
> URL: https://issues.apache.org/jira/browse/KAFKA-9893
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>
> This issue has two parts:
>  # Support transportation layer connection timeout described in KIP-601
>  # Optimize the logic for NetworkClient.leastLoadedNode()
> Changes:
>  # Added a new common client configuration parameter 
> socket.connection.setup.timeout.ms to the NetworkClient. Handle potential 
> transportation layer timeout using the same approach as it handling potential 
> request timeout.
>  # When no connected channel exists, leastLoadedNode() will now provide a 
> disconnected node that has the least number of failed attempts. 
>  # ClusterConnectionStates will keep the connecting node ids. Now it also has 
> several new public methods to provide per connection relavant data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-9845) plugin.path property does not work with config provider

2020-05-18 Thread victor (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

victor updated KAFKA-9845:
--
Comment: was deleted

(was: [~ alexlumpov]人名提及某人..)

> plugin.path property does not work with config provider
> ---
>
> Key: KAFKA-9845
> URL: https://issues.apache.org/jira/browse/KAFKA-9845
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
>
> The config provider mechanism doesn't work if used for the {{plugin.path}} 
> property of a standalone or distributed Connect worker. This is because the 
> {{Plugins}} instance which performs plugin path scanning is created using the 
> raw worker config, pre-transformation (see 
> [ConnectStandalone|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java#L79]
>  and 
> [ConnectDistributed|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java#L91]).
> Unfortunately, because config providers are loaded as plugins, there's a 
> circular dependency issue here. The {{Plugins}} instance needs to be created 
> _before_ the {{DistributedConfig}}/{{StandaloneConfig}} is created in order 
> for the config providers to be loaded correctly, and the config providers 
> need to be loaded in order to perform their logic on any properties 
> (including the {{plugin.path}} property).
> There is no clear fix for this issue in the code base, and the only known 
> workaround is to refrain from using config providers for the {{plugin.path}} 
> property.
> A couple improvements could potentially be made to improve the UX when this 
> issue arises:
>  #  Alter the config logging performed by the {{DistributedConfig}} and 
> {{StandaloneConfig}} classes to _always_ log the raw value for the 
> {{plugin.path}} property. Right now, the transformed value is logged even 
> though it isn't used, which is likely to cause confusion.
>  # Issue a {{WARN}}- or even {{ERROR}}-level log message when it's detected 
> that the user is attempting to use config providers for the {{plugin.path}} 
> property, which states that config providers cannot be used for that specific 
> property, instructs them to change the value for the property accordingly, 
> and/or informs them of the actual value that the framework will use for that 
> property when performing plugin path scanning.
> We should _not_ throw an error on startup if this condition is detected, as 
> this could cause previously-functioning, benignly-misconfigured Connect 
> workers to fail to start after an upgrade.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9845) plugin.path property does not work with config provider

2020-05-18 Thread victor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110023#comment-17110023
 ] 

victor commented on KAFKA-9845:
---

[~ alexlumpov]人名提及某人..

> plugin.path property does not work with config provider
> ---
>
> Key: KAFKA-9845
> URL: https://issues.apache.org/jira/browse/KAFKA-9845
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
>
> The config provider mechanism doesn't work if used for the {{plugin.path}} 
> property of a standalone or distributed Connect worker. This is because the 
> {{Plugins}} instance which performs plugin path scanning is created using the 
> raw worker config, pre-transformation (see 
> [ConnectStandalone|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java#L79]
>  and 
> [ConnectDistributed|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java#L91]).
> Unfortunately, because config providers are loaded as plugins, there's a 
> circular dependency issue here. The {{Plugins}} instance needs to be created 
> _before_ the {{DistributedConfig}}/{{StandaloneConfig}} is created in order 
> for the config providers to be loaded correctly, and the config providers 
> need to be loaded in order to perform their logic on any properties 
> (including the {{plugin.path}} property).
> There is no clear fix for this issue in the code base, and the only known 
> workaround is to refrain from using config providers for the {{plugin.path}} 
> property.
> A couple improvements could potentially be made to improve the UX when this 
> issue arises:
>  #  Alter the config logging performed by the {{DistributedConfig}} and 
> {{StandaloneConfig}} classes to _always_ log the raw value for the 
> {{plugin.path}} property. Right now, the transformed value is logged even 
> though it isn't used, which is likely to cause confusion.
>  # Issue a {{WARN}}- or even {{ERROR}}-level log message when it's detected 
> that the user is attempting to use config providers for the {{plugin.path}} 
> property, which states that config providers cannot be used for that specific 
> property, instructs them to change the value for the property accordingly, 
> and/or informs them of the actual value that the framework will use for that 
> property when performing plugin path scanning.
> We should _not_ throw an error on startup if this condition is detected, as 
> this could cause previously-functioning, benignly-misconfigured Connect 
> workers to fail to start after an upgrade.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 opened a new pull request #8685: KAFKA-10014 Always try to close all channels in Selector#close

2020-05-18 Thread GitBox


chia7712 opened a new pull request #8685:
URL: https://github.com/apache/kafka/pull/8685


   https://issues.apache.org/jira/browse/KAFKA-10014
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2020-05-18 Thread victor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110047#comment-17110047
 ] 

victor commented on KAFKA-9981:
---

[~ChrisEgerton] [~ryannedolan] hi. 
In case of dedicated mm2 clusters, If the configBackingStore task is hosted on 
a follower node,Can the following node write directly into the config topic?

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] qq619618919 commented on pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the update operation.

2020-05-18 Thread GitBox


qq619618919 commented on pull request #8656:
URL: https://github.com/apache/kafka/pull/8656#issuecomment-630040397


   > I'm still not sure what is going on here. What does it mean to update the 
config if !isLeader()? Only the leader actually has a config store.
   
   The following links,Chris Egerton's response was exactly what I wanted to 
say.
   
https://issues.apache.org/jira/browse/KAFKA-9981?focusedCommentId=17108867&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17108867
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] brary commented on a change in pull request #8395: Added doc for KIP-535 and updated it for KIP-562

2020-05-18 Thread GitBox


brary commented on a change in pull request #8395:
URL: https://github.com/apache/kafka/pull/8395#discussion_r426485550



##
File path: docs/upgrade.html
##
@@ -39,7 +39,8 @@ Notable changes in 2
 https://github.com/apache/kafka/tree/2.5/examples";>examples folder. 
Check out
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics";>KIP-447
 for the full details.
-Deprecated KafkaStreams.store(String, QueryableStoreType) 
and replaced it with KafkaStreams.store(StoreQueryParameters).
+Provided support to query stale stores(for high availability) and the 
stores belonging to a specific partition by deprecating 
KafkaStreams.store(String, QueryableStoreType) and replacing it 
with KafkaStreams.store(StoreQueryParameters).

Review comment:
   Thanks @abbccdda . Addressed both the comments in the latest commit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-05-18 Thread GitBox


chia7712 commented on pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-630066071


   ```EmbeddedKafkaCluster``` created by Stream UT uses ```MockTime``` to 
create ```KafkaServer```(
   
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java#L81)
 so the check of delayed item gets impacted by ```MockTime```.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-18 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r426498753



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -93,6 +96,28 @@ long partitionTimestamp(final TopicPartition partition) {
 return queue.partitionTime();
 }
 
+// creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
+void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+final Set removedPartitions = new HashSet<>();
+final Iterator> queuesIterator 
= partitionQueues.entrySet().iterator();
+while (queuesIterator.hasNext()) {
+final Map.Entry queueEntry = 
queuesIterator.next();
+final TopicPartition topicPartition = queueEntry.getKey();
+if (!newInputPartitions.contains(topicPartition)) {
+// if partition is removed should delete it's queue

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-18 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r426498877



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -93,6 +96,28 @@ long partitionTimestamp(final TopicPartition partition) {
 return queue.partitionTime();
 }
 
+// creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
+void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+final Set removedPartitions = new HashSet<>();
+final Iterator> queuesIterator 
= partitionQueues.entrySet().iterator();
+while (queuesIterator.hasNext()) {
+final Map.Entry queueEntry = 
queuesIterator.next();
+final TopicPartition topicPartition = queueEntry.getKey();
+if (!newInputPartitions.contains(topicPartition)) {
+// if partition is removed should delete it's queue
+totalBuffered -= queueEntry.getValue().size();
+queuesIterator.remove();
+removedPartitions.add(topicPartition);
+}
+newInputPartitions.remove(topicPartition);
+}
+for (final TopicPartition newInputPartition : newInputPartitions) {
+partitionQueues.put(newInputPartition, 
recordQueueCreator.apply(newInputPartition));
+}
+nonEmptyQueuesByTime.removeIf(q -> 
removedPartitions.contains(q.partition()));
+allBuffered = allBuffered && newInputPartitions.isEmpty();

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-18 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r426499189



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -170,6 +170,8 @@ public boolean isValidTransition(final State newState) {
  */
 void closeDirty();
 
+void update(final Set topicPartitions, final 
ProcessorTopology processorTopology);

Review comment:
   yes, added short description





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-4748) Need a way to shutdown all workers in a Streams application at the same time

2020-05-18 Thread Antony Stubbs (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110133#comment-17110133
 ] 

Antony Stubbs commented on KAFKA-4748:
--

FYI [~mjsax], my intention for Kafka-6943 was for a single KS instance, not an 
entire cluster. This (Kafka-4748) would be for the entire cluster (all KS 
instances), from what I understand.

Extending Kafka-6943 to have an option to shutdown the entire cluster upon a 
thread crash in a single instance or all threads crashing could be interesting, 
but would seem an order of magnitude more complex than triggering the shutdown 
of the instance the thread was living on.

> Need a way to shutdown all workers in a Streams application at the same time
> 
>
> Key: KAFKA-4748
> URL: https://issues.apache.org/jira/browse/KAFKA-4748
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Elias Levy
>Priority: Major
>
> If you have a fleet of Stream workers for an application and attempt to shut 
> them down simultaneously (e.g. via SIGTERM and 
> Runtime.getRuntime().addShutdownHook() and streams.close())), a large number 
> of the workers fail to shutdown.
> The problem appears to be a race condition between the shutdown signal and 
> the consumer rebalancing that is triggered by some of the workers existing 
> before others.  Apparently, workers that receive the signal later fail to 
> exit apparently as they are caught in the rebalance.
> Terminating workers in a rolling fashion is not advisable in some situations. 
>  The rolling shutdown will result in many unnecessary rebalances and may 
> fail, as the application may have large amount of local state that a smaller 
> number of nodes may not be able to store.
> It would appear that there is a need for a protocol change to allow the 
> coordinator to signal a consumer group to shutdown without leading to 
> rebalancing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4748) Need a way to shutdown all workers in a Streams application at the same time

2020-05-18 Thread Antony Stubbs (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110133#comment-17110133
 ] 

Antony Stubbs edited comment on KAFKA-4748 at 5/18/20, 10:47 AM:
-

FYI [~mjsax], my intention for KAFKA-6943 was for a single KS instance, not an 
entire cluster. This (KAFKA-4748) would be for the entire cluster (all KS 
instances), from what I understand.

Extending KAFKA-6943 to have an option to shutdown the entire cluster upon a 
thread crash in a single instance or all threads crashing could be interesting, 
but would seem an order of magnitude more complex than triggering the shutdown 
of the instance the thread was living on.


was (Author: astubbs):
FYI [~mjsax], my intention for Kafka-6943 was for a single KS instance, not an 
entire cluster. This (Kafka-4748) would be for the entire cluster (all KS 
instances), from what I understand.

Extending Kafka-6943 to have an option to shutdown the entire cluster upon a 
thread crash in a single instance or all threads crashing could be interesting, 
but would seem an order of magnitude more complex than triggering the shutdown 
of the instance the thread was living on.

> Need a way to shutdown all workers in a Streams application at the same time
> 
>
> Key: KAFKA-4748
> URL: https://issues.apache.org/jira/browse/KAFKA-4748
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Elias Levy
>Priority: Major
>
> If you have a fleet of Stream workers for an application and attempt to shut 
> them down simultaneously (e.g. via SIGTERM and 
> Runtime.getRuntime().addShutdownHook() and streams.close())), a large number 
> of the workers fail to shutdown.
> The problem appears to be a race condition between the shutdown signal and 
> the consumer rebalancing that is triggered by some of the workers existing 
> before others.  Apparently, workers that receive the signal later fail to 
> exit apparently as they are caught in the rebalance.
> Terminating workers in a rolling fashion is not advisable in some situations. 
>  The rolling shutdown will result in many unnecessary rebalances and may 
> fail, as the application may have large amount of local state that a smaller 
> number of nodes may not be able to store.
> It would appear that there is a need for a protocol change to allow the 
> coordinator to signal a consumer group to shutdown without leading to 
> rebalancing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6182) Automatic co-partitioning of topics via automatic intermediate topic with matching partitions

2020-05-18 Thread Antony Stubbs (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110137#comment-17110137
 ] 

Antony Stubbs commented on KAFKA-6182:
--

Ah yes that makes sense. Assuming the exception thrown when the partition 
mismatch is found at runtime would cause the entire system to fail fast, I 
think yes, this repartition operation solves the issue.

> Automatic co-partitioning of topics via automatic intermediate topic with 
> matching partitions
> -
>
> Key: KAFKA-6182
> URL: https://issues.apache.org/jira/browse/KAFKA-6182
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>Priority: Major
>
> Currently it is up to the user to ensure that two input topics for a join 
> have the same number of partitions, and if they don't, manually create an 
> intermediate topic, and send the stream #through that topic first, and then 
> performing the join.
> It would be great to have Kafka streams detect this and at least give the 
> user the option to create an intermediate topic automatically with the same 
> number of partitions as the topic being joined with.
> See 
> https://docs.confluent.io/current/streams/developer-guide.html#joins-require-co-partitioning-of-the-input-data



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon opened a new pull request #8686: MINOR: Remove redundant TOC and introduction in Running Streams Applications

2020-05-18 Thread GitBox


showuon opened a new pull request #8686:
URL: https://github.com/apache/kafka/pull/8686


   Remove redundant TOC and introduction in Running Streams Applications
   
   before:
   
![image](https://user-images.githubusercontent.com/43372967/82212247-d181ec00-9944-11ea-8aac-3612fc17893d.png)
   
   after:
   
![image](https://user-images.githubusercontent.com/43372967/82212560-510fbb00-9945-11ea-8b0d-7911a2ac13e2.png)
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8686: MINOR: Remove redundant TOC and introduction in Running Streams Applications

2020-05-18 Thread GitBox


showuon commented on pull request #8686:
URL: https://github.com/apache/kafka/pull/8686#issuecomment-630148688


   @guozhangwang , a small documentation update PR in streams page. Please help 
review. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


cadonna commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r426592363



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -16,87 +16,129 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.streams.KeyValue;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.internals.ApiUtils;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 import java.time.Duration;
 import java.util.List;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
+import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements 
RecordCollector.Supplier {
+public static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
+public static final ByteArraySerializer VALUE_SERIALIZER = new 
ByteArraySerializer();
 
-private final StreamTask task;
+// The below are both null for standby tasks
+private final StreamTask streamTask;
 private final RecordCollector collector;
+
 private final ToInternal toInternal = new ToInternal();
 private final static To SEND_TO_ALL = To.all();
 
+final Map storeToChangelogTopic = new HashMap<>();
+
 ProcessorContextImpl(final TaskId id,
- final StreamTask task,
+ final StreamTask streamTask,
  final StreamsConfig config,
  final RecordCollector collector,
  final ProcessorStateManager stateMgr,
  final StreamsMetricsImpl metrics,
  final ThreadCache cache) {
 super(id, config, metrics, stateMgr, cache);
-this.task = task;
+this.streamTask = streamTask;
 this.collector = collector;
+
+if (streamTask == null && taskType() == TaskType.ACTIVE) {
+throw new IllegalStateException("Tried to create context for 
active task but the streamtask was null");
+}
+}
+
+ProcessorContextImpl(final TaskId id,
+ final StreamsConfig config,
+ final ProcessorStateManager stateMgr,
+ final StreamsMetricsImpl metrics) {
+this(
+id,
+null,
+config,
+null,
+stateMgr,
+metrics,
+new ThreadCache(
+new LogContext(String.format("stream-thread [%s] ", 
Thread.currentThread().getName())),
+0,
+metrics
+)
+);
 }
 
-public ProcessorStateManager getStateMgr() {
+public ProcessorStateManager stateManager() {
 return (ProcessorStateManager) stateManager;
 }
 
+@Override
+public void register(final StateStore store,
+ final StateRestoreCallback stateRestoreCallback) {
+storeToChangelogTopic.put(store.name(), 
ProcessorStateManager.storeChangelogTopic(applicationId(), store.name()));
+super.register(store, stateRestoreCallback);
+}
+
 @Override
 public RecordCollector recordCollector() {
 retu

[GitHub] [kafka] cadonna commented on pull request #8157: KAFKA-9088: Consolidate InternalMockProcessorContext and MockInternalProcessorContext

2020-05-18 Thread GitBox


cadonna commented on pull request #8157:
URL: https://github.com/apache/kafka/pull/8157#issuecomment-630245364


   @pierDipi FYI: We need to make some changes to the 
`InternalProcessorContext` that are needed for two high priority tickets. The 
changes are done in PR #8669. The changes affect also this PR.  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-05-18 Thread GitBox


lbradstreet commented on pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-630249725


   > `EmbeddedKafkaCluster` created by Stream UT uses `MockTime` to create 
`KafkaServer`(
   > 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java#L81)
 so the check of delayed item gets impacted by `MockTime`.
   
   @chia7712 thanks, nice catch. That does make things trickier.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r426700737



##
File path: 
streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
##
@@ -52,6 +54,8 @@
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KEY_SERIALIZER;
+import static 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.VALUE_SERIALIZER;

Review comment:
   Ack





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10015) React to Unexpected Errors on Stream Threads

2020-05-18 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10015:
-

 Summary: React to Unexpected Errors on Stream Threads
 Key: KAFKA-10015
 URL: https://issues.apache.org/jira/browse/KAFKA-10015
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


Currently, if an unexpected error occurs on a stream thread, the stream thread 
dies, a rebalance is triggered, and the Streams' client continues to run with 
less stream threads. 
 
Some errors trigger a cascading of stream thread death, i.e., after the 
rebalance that resulted from the death of the first thread the next thread 
dies, then a rebalance is triggered, the next thread dies, and so forth until 
all stream threads are dead and the instance shuts down. Such a chain of 
rebalances could be avoided if an error could be recognized as the cause of 
cascading stream deaths and as a consequence the Streams' client could be shut 
down after the first stream thread death.

On the other hand, some unexpected errors are transient and the stream thread 
could safely be restarted without causing further errors and without the need 
to restart the Streams' client.

The goal of this ticket is to classify errors and to automatically react to the 
errors in a way to avoid cascading deaths and to recover stream threads if 
possible.   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10015) React Smartly to Unexpected Errors on Stream Threads

2020-05-18 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10015:
--
Summary: React Smartly to Unexpected Errors on Stream Threads  (was: React 
to Unexpected Errors on Stream Threads)

> React Smartly to Unexpected Errors on Stream Threads
> 
>
> Key: KAFKA-10015
> URL: https://issues.apache.org/jira/browse/KAFKA-10015
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> Currently, if an unexpected error occurs on a stream thread, the stream 
> thread dies, a rebalance is triggered, and the Streams' client continues to 
> run with less stream threads. 
>  
> Some errors trigger a cascading of stream thread death, i.e., after the 
> rebalance that resulted from the death of the first thread the next thread 
> dies, then a rebalance is triggered, the next thread dies, and so forth until 
> all stream threads are dead and the instance shuts down. Such a chain of 
> rebalances could be avoided if an error could be recognized as the cause of 
> cascading stream deaths and as a consequence the Streams' client could be 
> shut down after the first stream thread death.
> On the other hand, some unexpected errors are transient and the stream thread 
> could safely be restarted without causing further errors and without the need 
> to restart the Streams' client.
> The goal of this ticket is to classify errors and to automatically react to 
> the errors in a way to avoid cascading deaths and to recover stream threads 
> if possible.   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10015) React Smartly to Unexpected Errors on Stream Threads

2020-05-18 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-10015:
-

Assignee: Bruno Cadonna

> React Smartly to Unexpected Errors on Stream Threads
> 
>
> Key: KAFKA-10015
> URL: https://issues.apache.org/jira/browse/KAFKA-10015
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: needs-kip
>
> Currently, if an unexpected error occurs on a stream thread, the stream 
> thread dies, a rebalance is triggered, and the Streams' client continues to 
> run with less stream threads. 
>  
> Some errors trigger a cascading of stream thread death, i.e., after the 
> rebalance that resulted from the death of the first thread the next thread 
> dies, then a rebalance is triggered, the next thread dies, and so forth until 
> all stream threads are dead and the instance shuts down. Such a chain of 
> rebalances could be avoided if an error could be recognized as the cause of 
> cascading stream deaths and as a consequence the Streams' client could be 
> shut down after the first stream thread death.
> On the other hand, some unexpected errors are transient and the stream thread 
> could safely be restarted without causing further errors and without the need 
> to restart the Streams' client.
> The goal of this ticket is to classify errors and to automatically react to 
> the errors in a way to avoid cascading deaths and to recover stream threads 
> if possible.   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10015) React Smartly to Unexpected Errors on Stream Threads

2020-05-18 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10015:
--
Labels: needs-kip  (was: )

> React Smartly to Unexpected Errors on Stream Threads
> 
>
> Key: KAFKA-10015
> URL: https://issues.apache.org/jira/browse/KAFKA-10015
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: needs-kip
>
> Currently, if an unexpected error occurs on a stream thread, the stream 
> thread dies, a rebalance is triggered, and the Streams' client continues to 
> run with less stream threads. 
>  
> Some errors trigger a cascading of stream thread death, i.e., after the 
> rebalance that resulted from the death of the first thread the next thread 
> dies, then a rebalance is triggered, the next thread dies, and so forth until 
> all stream threads are dead and the instance shuts down. Such a chain of 
> rebalances could be avoided if an error could be recognized as the cause of 
> cascading stream deaths and as a consequence the Streams' client could be 
> shut down after the first stream thread death.
> On the other hand, some unexpected errors are transient and the stream thread 
> could safely be restarted without causing further errors and without the need 
> to restart the Streams' client.
> The goal of this ticket is to classify errors and to automatically react to 
> the errors in a way to avoid cascading deaths and to recover stream threads 
> if possible.   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10015) React Smartly to Unexpected Errors on Stream Threads

2020-05-18 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10015:
--
Affects Version/s: 2.5.0

> React Smartly to Unexpected Errors on Stream Threads
> 
>
> Key: KAFKA-10015
> URL: https://issues.apache.org/jira/browse/KAFKA-10015
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: needs-kip
>
> Currently, if an unexpected error occurs on a stream thread, the stream 
> thread dies, a rebalance is triggered, and the Streams' client continues to 
> run with less stream threads. 
>  
> Some errors trigger a cascading of stream thread death, i.e., after the 
> rebalance that resulted from the death of the first thread the next thread 
> dies, then a rebalance is triggered, the next thread dies, and so forth until 
> all stream threads are dead and the instance shuts down. Such a chain of 
> rebalances could be avoided if an error could be recognized as the cause of 
> cascading stream deaths and as a consequence the Streams' client could be 
> shut down after the first stream thread death.
> On the other hand, some unexpected errors are transient and the stream thread 
> could safely be restarted without causing further errors and without the need 
> to restart the Streams' client.
> The goal of this ticket is to classify errors and to automatically react to 
> the errors in a way to avoid cascading deaths and to recover stream threads 
> if possible.   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #8376: KAFKA-9724 Newer clients not always sending fetch request to older brokers

2020-05-18 Thread GitBox


mumrah commented on a change in pull request #8376:
URL: https://github.com/apache/kafka/pull/8376#discussion_r426722927



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -378,26 +382,6 @@ public void testRejectOldMetadata() {
 }
 }
 
-@Test
-public void testMaybeRequestUpdate() {

Review comment:
   I ended up writing a new test `testUpdateLastEpoch`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8682: KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll

2020-05-18 Thread GitBox


vvcephei commented on pull request #8682:
URL: https://github.com/apache/kafka/pull/8682#issuecomment-630272579


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-18 Thread GitBox


guozhangwang commented on pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#issuecomment-630279061


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8686: MINOR: Remove redundant TOC and introduction in Running Streams Applications

2020-05-18 Thread GitBox


guozhangwang merged pull request #8686:
URL: https://github.com/apache/kafka/pull/8686


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-18 Thread GitBox


guozhangwang commented on pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#issuecomment-630297298


   test this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


guozhangwang commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r426754892



##
File path: 
streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
##
@@ -349,6 +353,27 @@ public Headers headers() {
 return recordContext.headers();
 }
 
+@Override
+public TaskType taskType() {
+return TaskType.ACTIVE;
+}
+
+@Override
+public void logChange(final String storeName,
+  final Bytes key,
+  final byte[] value,
+  final long timestamp) {
+recordCollector().send(

Review comment:
   No I do not, just wanting to make sure we do not have any major 
conflicts when rebasing the other.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


guozhangwang commented on pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#issuecomment-630299683


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


guozhangwang commented on pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#issuecomment-630299873


   LGTM. Will merge after green builds.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2020-05-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110447#comment-17110447
 ] 

Guozhang Wang commented on KAFKA-6520:
--

Just to throw some more ideas here: the embedded clients are now all using 
async network IO and hence one would never get ClosedChannelException, instead, 
they will eventually get TimeoutException when the broker is actually offline. 
[~mjsax] is currently working on KIP-572 to let Streams be more resilient to 
such connectivity issues (broker unavailable is exposed the same as network 
in-connectivity), while if we have N tasks, we would still continue when only a 
subset of them cannot progress. On the other hand, a Streams client may talk to 
multiple brokers on behalf of different tasks, and as long as one of the tasks 
can still progress -- meaning, its corresponding required brokers are still 
reachable -- then we would not need to mark the client as disconnected.

Following this train of thoughts, I feel that we would only transit to the 
DISCONNECTED state if none of the tasks are progressing, indicating that none 
of the required brokers are available at the moment. Does that make sense? If 
yes then the scope of it can be much simplified, and maybe we can also just 
piggyback the proposal as part of KIP-572 so that we do not need a separate 
KIP. Of course, implementation wise Vince and Matthias can still proceed 
independently.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


guozhangwang commented on pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#issuecomment-630320804


   test this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-18 Thread GitBox


guozhangwang commented on pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#issuecomment-630321714


   `:streams:spotbugsMain` failed, cc @avalsa 
   
   You can run the command locally to find out which sptobugMain rule is 
violated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10008) Symbol not found when running Kafka Streams with RocksDB dependency on MacOS 10.13.6

2020-05-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10008:
---

Assignee: Matthias J. Sax

> Symbol not found when running Kafka Streams with RocksDB dependency on MacOS 
> 10.13.6
> 
>
> Key: KAFKA-10008
> URL: https://issues.apache.org/jira/browse/KAFKA-10008
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
> Environment: MacOS 10.13.6
>Reporter: Victoria Xia
>Assignee: Matthias J. Sax
>Priority: Major
>
> In bumping the RocksDB dependency version from 5.18.3 to 5.18.4 on trunk, 
> Kafka Streams apps that require initializing RocksDB state stores fail on 
> MacOS 10.13.6 with
> {code:java}
> dyld: lazy symbol binding failed: Symbol not found: chkstk_darwin
>   Referenced from: 
> /private/var/folders/y4/v3q4tgb559sb0x6kwpll19bmgn/T/librocksdbjni4028367213086899694.jnilib
>  (which was built for Mac OS X 10.15)
>   Expected in: /usr/lib/libSystem.B.dylib
> {code}
> as a result of [https://github.com/facebook/rocksdb/issues/6852]
> 2.5.0 is unaffected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax opened a new pull request #8687: MINOR: updated MacOS compatibility statement for RocksDB

2020-05-18 Thread GitBox


mjsax opened a new pull request #8687:
URL: https://github.com/apache/kafka/pull/8687


   With https://issues.apache.org/jira/browse/KAFKA-9225 Kafka Streams 2.6.0 
requires MacOS 10.15.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8687: MINOR: updated MacOS compatibility statement for RocksDB

2020-05-18 Thread GitBox


mjsax commented on pull request #8687:
URL: https://github.com/apache/kafka/pull/8687#issuecomment-630327661


   Call for review @ableegoldman @vcrfxia @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10008) Symbol not found when running Kafka Streams with RocksDB dependency on MacOS 10.13.6

2020-05-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110478#comment-17110478
 ] 

Matthias J. Sax commented on KAFKA-10008:
-

Opened a PR to update the docs: https://github.com/apache/kafka/pull/8687

> Symbol not found when running Kafka Streams with RocksDB dependency on MacOS 
> 10.13.6
> 
>
> Key: KAFKA-10008
> URL: https://issues.apache.org/jira/browse/KAFKA-10008
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
> Environment: MacOS 10.13.6
>Reporter: Victoria Xia
>Assignee: Matthias J. Sax
>Priority: Major
>
> In bumping the RocksDB dependency version from 5.18.3 to 5.18.4 on trunk, 
> Kafka Streams apps that require initializing RocksDB state stores fail on 
> MacOS 10.13.6 with
> {code:java}
> dyld: lazy symbol binding failed: Symbol not found: chkstk_darwin
>   Referenced from: 
> /private/var/folders/y4/v3q4tgb559sb0x6kwpll19bmgn/T/librocksdbjni4028367213086899694.jnilib
>  (which was built for Mac OS X 10.15)
>   Expected in: /usr/lib/libSystem.B.dylib
> {code}
> as a result of [https://github.com/facebook/rocksdb/issues/6852]
> 2.5.0 is unaffected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10012) Reducing memory overhead associated with strings in MetricName

2020-05-18 Thread Navina Ramesh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navina Ramesh reassigned KAFKA-10012:
-

Assignee: Navina Ramesh

> Reducing memory overhead associated with strings in MetricName
> --
>
> Key: KAFKA-10012
> URL: https://issues.apache.org/jira/browse/KAFKA-10012
> Project: Kafka
>  Issue Type: Improvement
>  Components: network
>Reporter: Navina Ramesh
>Assignee: Navina Ramesh
>Priority: Major
>
> {{SelectorMetrics}} has a per-connection metrics, which means the number of 
> {{MetricName}} objects and the strings associated with it (such as group name 
> and description) grows with the number of connections in the client. This 
> overhead of duplicate string objects is amplified when there are multiple 
> instances of kafka clients within the same JVM. 
> This patch address some of the memory overhead by making {{metricGrpName}} a 
> constant and introducing a new constant {{perConnectionMetricGrpName}}. 
> Additionally, the strings for metric name and description in {{createMeter}} 
> have been interned since there are about 8 per-client and 4 per-connection 
> {{Meter}} instances.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7870) Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was disconnected before the response was read.

2020-05-18 Thread Nicolas Martinez Varsi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110489#comment-17110489
 ] 

Nicolas Martinez Varsi commented on KAFKA-7870:
---

Any news on this? We upgraded from 2.0.0 to 2.1.0 and this error started to 
appear.

Besides restarting/stopping the problematic broker, are there any workarounds?

> Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: 
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read.
> 
>
> Key: KAFKA-7870
> URL: https://issues.apache.org/jira/browse/KAFKA-7870
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Chakhsu Lau
>Priority: Blocker
>
> We build a kafka cluster with 5 brokers. But one of brokers suddenly stopped 
> running during the run. And it happened twice in the same broker. Here is the 
> log and is this a bug in kafka ?
> {code:java}
> [2019-01-25 12:57:14,686] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error sending fetch request (sessionId=1578860481, 
> epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was 
> disconnected before the response was read. 
> (org.apache.kafka.clients.FetchSessionHandler)
> [2019-01-25 12:57:14,687] WARN [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={api-result-bi-heatmap-8=(offset=0, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-save-12=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-task-2=(offset=2, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-flow-39=(offset=1883206, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), __consumer_offsets-47=(offset=349437, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-track-6=(offset=1039889, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-task-17=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), __consumer_offsets-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-aggs-19=(offset=1255056, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1578860481, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10008) Symbol not found when running Kafka Streams with RocksDB dependency on MacOS 10.13.6

2020-05-18 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10008.
--
Resolution: Won't Fix

Following the other comments, we've decided just to document it.

> Symbol not found when running Kafka Streams with RocksDB dependency on MacOS 
> 10.13.6
> 
>
> Key: KAFKA-10008
> URL: https://issues.apache.org/jira/browse/KAFKA-10008
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
> Environment: MacOS 10.13.6
>Reporter: Victoria Xia
>Assignee: Matthias J. Sax
>Priority: Major
>
> In bumping the RocksDB dependency version from 5.18.3 to 5.18.4 on trunk, 
> Kafka Streams apps that require initializing RocksDB state stores fail on 
> MacOS 10.13.6 with
> {code:java}
> dyld: lazy symbol binding failed: Symbol not found: chkstk_darwin
>   Referenced from: 
> /private/var/folders/y4/v3q4tgb559sb0x6kwpll19bmgn/T/librocksdbjni4028367213086899694.jnilib
>  (which was built for Mac OS X 10.15)
>   Expected in: /usr/lib/libSystem.B.dylib
> {code}
> as a result of [https://github.com/facebook/rocksdb/issues/6852]
> 2.5.0 is unaffected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2020-05-18 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110488#comment-17110488
 ] 

Chris Egerton commented on KAFKA-9981:
--

[~qq619618919] we could but it wouldn't be simple. We'd have to take care to 
ensure that writes from zombie workers would be ignored, which is done right 
now by only allowing the leader to write to the config topic.

I think it'd be easier to bring up the task configs endpoint for MM2 than to 
re-architect the Connect framework, especially given the compatibility and 
migration concerns that would have to be addressed in order to allow non-leader 
workers to write to the config topic. But either approach would work.

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9994) Catch TaskMigrated exception in task corruption code path

2020-05-18 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-9994:
---

Assignee: Boyang Chen

> Catch TaskMigrated exception in task corruption code path 
> --
>
> Key: KAFKA-9994
> URL: https://issues.apache.org/jira/browse/KAFKA-9994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> We have seen a case where the TaskMigrated exception gets thrown from 
> taskManager.commit(). This should be prevented by proper catching.
> Looking at the stack trace, the TaskMigrated was thrown from preCommit() call 
> inside corrupted task exception commit.
> {code:java}
> [2020-05-14T05:47:25-07:00] 
> (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) [2020-05-14 
> 12:47:25,635] ERROR 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
> stream-thread 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-14T05:47:25-07:00] 
> (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) 
> org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced 
> trying to send a record [stream-thread 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] task 
> [1_1]]; it means all tasks belonging to this thread should be migrated.
>         at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:216)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
>         at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore.log(ChangeLoggingTimestampedWindowBytesStore.java:36)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:112)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:34)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:111)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:91)
>         at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>         at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>         at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:296)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$flush$4(MeteredWindowStore.java:200)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:200)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:402)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:317)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:753)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:573)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9994) Catch TaskMigrated exception in task corruption code path

2020-05-18 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9994.
-
Resolution: Fixed

> Catch TaskMigrated exception in task corruption code path 
> --
>
> Key: KAFKA-9994
> URL: https://issues.apache.org/jira/browse/KAFKA-9994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> We have seen a case where the TaskMigrated exception gets thrown from 
> taskManager.commit(). This should be prevented by proper catching.
> Looking at the stack trace, the TaskMigrated was thrown from preCommit() call 
> inside corrupted task exception commit.
> {code:java}
> [2020-05-14T05:47:25-07:00] 
> (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) [2020-05-14 
> 12:47:25,635] ERROR 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
> stream-thread 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-14T05:47:25-07:00] 
> (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) 
> org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced 
> trying to send a record [stream-thread 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] task 
> [1_1]]; it means all tasks belonging to this thread should be migrated.
>         at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:216)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
>         at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore.log(ChangeLoggingTimestampedWindowBytesStore.java:36)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:112)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:34)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:111)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:91)
>         at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>         at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>         at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:296)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$flush$4(MeteredWindowStore.java:200)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:200)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:402)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:317)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:753)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:573)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4748) Need a way to shutdown all workers in a Streams application at the same time

2020-05-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110493#comment-17110493
 ] 

Matthias J. Sax commented on KAFKA-4748:


Thanks for clarifying [~astubbs]

> Need a way to shutdown all workers in a Streams application at the same time
> 
>
> Key: KAFKA-4748
> URL: https://issues.apache.org/jira/browse/KAFKA-4748
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Elias Levy
>Priority: Major
>
> If you have a fleet of Stream workers for an application and attempt to shut 
> them down simultaneously (e.g. via SIGTERM and 
> Runtime.getRuntime().addShutdownHook() and streams.close())), a large number 
> of the workers fail to shutdown.
> The problem appears to be a race condition between the shutdown signal and 
> the consumer rebalancing that is triggered by some of the workers existing 
> before others.  Apparently, workers that receive the signal later fail to 
> exit apparently as they are caught in the rebalance.
> Terminating workers in a rolling fashion is not advisable in some situations. 
>  The rolling shutdown will result in many unnecessary rebalances and may 
> fail, as the application may have large amount of local state that a smaller 
> number of nodes may not be able to store.
> It would appear that there is a need for a protocol change to allow the 
> coordinator to signal a consumer group to shutdown without leading to 
> rebalancing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-9989:
---

Assignee: Boyang Chen

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and skip the record processing validation when the assignment 
> is empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6182) Automatic co-partitioning of topics via automatic intermediate topic with matching partitions

2020-05-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110497#comment-17110497
 ] 

Matthias J. Sax commented on KAFKA-6182:


Failing fast is not covered via KIP-221, but there are other tickets that would 
address it:  KAFKA-10015 and KAFKA-4748 seem related for a "fail fast" feature.

> Automatic co-partitioning of topics via automatic intermediate topic with 
> matching partitions
> -
>
> Key: KAFKA-6182
> URL: https://issues.apache.org/jira/browse/KAFKA-6182
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>Priority: Major
>
> Currently it is up to the user to ensure that two input topics for a join 
> have the same number of partitions, and if they don't, manually create an 
> intermediate topic, and send the stream #through that topic first, and then 
> performing the join.
> It would be great to have Kafka streams detect this and at least give the 
> user the option to create an intermediate topic automatically with the same 
> number of partitions as the topic being joined with.
> See 
> https://docs.confluent.io/current/streams/developer-guide.html#joins-require-co-partitioning-of-the-input-data



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #8682: KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll

2020-05-18 Thread GitBox


ableegoldman commented on pull request #8682:
URL: https://github.com/apache/kafka/pull/8682#issuecomment-630344469


   Instead of removing the tasks during `handleLostAll`, can we just clear the 
`lockedTaskDirectories` set at the end of 
`releaseLockedUnassignedTaskDirectories`? This set is only used to keep track 
of which task directories we only temporarily locked for the rebalance, so it 
makes sense that it should be empty outside of a rebalance.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-18 Thread GitBox


ableegoldman commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r426808431



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   @showuon Boyang is right, there is no `upgrade.from` config for 2.4 
since that's when cooperative rebalancing was enabled. So if you upgrade to 2.5 
from any version lower than 2.4, you will need to go through this upgrade path 
and set the config.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #8569: KIP-551: Expose disk read and write metrics

2020-05-18 Thread GitBox


cmccabe commented on a change in pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#discussion_r426808657



##
File path: core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
##
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.kafka.common.utils.Time
+import org.slf4j.Logger
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Retrieves Linux /proc/self/io metrics.
+ */
+class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: 
Logger) {
+  import LinuxIoMetricsCollector._
+  var lastUpdateMs = -1L
+  var cachedReadBytes = 0L
+  var cachedWriteBytes = 0L
+  val path = Paths.get(procRoot, "self", "io")
+
+  def readBytes(): Long = this.synchronized {
+val curMs = time.milliseconds()

Review comment:
   Interesting idea, but that would complicate the `usable` function, 
right?  Probably better to leave it where it is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2020-05-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110508#comment-17110508
 ] 

Sophie Blee-Goldman commented on KAFKA-6579:


No, I looked into it but the scope its nontrivial. I'll unassign it and maybe 
someone from the community can pick it up ^^

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2020-05-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-6579:
--

Assignee: (was: Sophie Blee-Goldman)

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #8569: KIP-551: Expose disk read and write metrics

2020-05-18 Thread GitBox


cmccabe commented on pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#issuecomment-630352959


   @mumrah :  Good question.  I don't think anyone has looked at Sigar.  I 
guess the question is whether we want to get into the business of doing 
general-purpose node monitoring.  I think many people would say no.  We're 
doing this metric mainly because it's very simple to check, and also very 
impactful for Kafka (starting heavy disk reads often correlates with 
performance tanking).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #8688: MINOR: Introduce separate methods in KafkaApis for consumer and follower fetch handling

2020-05-18 Thread GitBox


ijuma opened a new pull request #8688:
URL: https://github.com/apache/kafka/pull/8688


   This is a bit odd in that it's not needed from a semantics perspective,
   but it would make it much easier to distinguish the cost of follower
   fetches versus consumer fetches when profiling.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll

2020-05-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110518#comment-17110518
 ] 

Sophie Blee-Goldman commented on KAFKA-10010:
-

It's possible the active <-> standby task conversion PR would actually fix this 
on the side, as it skips re-registering any store that's already registered. 
I'd like to avoid closing standbys during handleLostAll since this will 
completely clear out any in-memory stores, for example

> Should close standby task for safety during HandleLostAll
> -
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll

2020-05-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110522#comment-17110522
 ] 

Sophie Blee-Goldman commented on KAFKA-10010:
-

When I first started looking into the store registration and initialization 
logic for that PR, I remember thinking there was a bug since we would attempt 
to re-register stores if we hit an exception halfway through registration. I 
snooped around and it seemed like there wasn't really a way to hit this bug, 
but I fixed it anyways.

Seems like there actually was a way to hit this bug after all, so nice catch 
[~bchen225242]

> Should close standby task for safety during HandleLostAll
> -
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


guozhangwang commented on pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#issuecomment-630370157


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma closed pull request #8688: MINOR: Introduce separate methods in KafkaApis for consumer and follower fetch handling

2020-05-18 Thread GitBox


ijuma closed pull request #8688:
URL: https://github.com/apache/kafka/pull/8688


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8688: MINOR: Introduce separate methods in KafkaApis for consumer and follower fetch handling

2020-05-18 Thread GitBox


ijuma commented on pull request #8688:
URL: https://github.com/apache/kafka/pull/8688#issuecomment-630376046


   Closing this for now as there may be a better way to achieve this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas

2020-05-18 Thread GitBox


apovzner commented on pull request #8650:
URL: https://github.com/apache/kafka/pull/8650#issuecomment-630379942


   It looks like the build couldn't even run tests:
   ```
   15:15:09 ERROR: Error cloning remote repo 'origin'
   ...
   15:15:18 stderr: fatal: Unable to look up github.com (port 9418) (Name or 
service not known)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8675: KAFKA-10004: Fix the default broker configs cannot be displayed when using kafka-configs.sh --describe

2020-05-18 Thread GitBox


cmccabe commented on pull request #8675:
URL: https://github.com/apache/kafka/pull/8675#issuecomment-630389772


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8684: KAFKA-10012 Reducing memory overhead associated with strings in Metri…

2020-05-18 Thread GitBox


ijuma commented on pull request #8684:
URL: https://github.com/apache/kafka/pull/8684#issuecomment-630391729


   Thanks for the PR. Java's built in string interning mechanism is known to 
have issues. Not sure we want to do that. Maybe we can remove that part of the 
change from this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8685: KAFKA-10014 Always try to close all channels in Selector#close

2020-05-18 Thread GitBox


ijuma commented on a change in pull request #8685:
URL: https://github.com/apache/kafka/pull/8685#discussion_r426850232



##
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##
@@ -363,23 +363,14 @@ public void wakeup() {
 @Override
 public void close() {
 List connections = new ArrayList<>(channels.keySet());
-try {
-for (String id : connections)
-close(id);
-} finally {
-// If there is any exception thrown in close(id), we should still 
be able
-// to close the remaining objects, especially the sensors because 
keeping
-// the sensors may lead to failure to start up the 
ReplicaFetcherThread if
-// the old sensors with the same names has not yet been cleaned up.
-AtomicReference firstException = new 
AtomicReference<>();
-Utils.closeQuietly(nioSelector, "nioSelector", firstException);
-Utils.closeQuietly(sensors, "sensors", firstException);
-Utils.closeQuietly(channelBuilder, "channelBuilder", 
firstException);
-Throwable exception = firstException.get();
-if (exception instanceof RuntimeException && !(exception 
instanceof SecurityException)) {
-throw (RuntimeException) exception;
-}
-
+AtomicReference firstException = new AtomicReference<>();

Review comment:
   Have we considered using `Utils.closeAll`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8685: KAFKA-10014 Always try to close all channels in Selector#close

2020-05-18 Thread GitBox


ijuma commented on a change in pull request #8685:
URL: https://github.com/apache/kafka/pull/8685#discussion_r426850232



##
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##
@@ -363,23 +363,14 @@ public void wakeup() {
 @Override
 public void close() {
 List connections = new ArrayList<>(channels.keySet());
-try {
-for (String id : connections)
-close(id);
-} finally {
-// If there is any exception thrown in close(id), we should still 
be able
-// to close the remaining objects, especially the sensors because 
keeping
-// the sensors may lead to failure to start up the 
ReplicaFetcherThread if
-// the old sensors with the same names has not yet been cleaned up.
-AtomicReference firstException = new 
AtomicReference<>();
-Utils.closeQuietly(nioSelector, "nioSelector", firstException);
-Utils.closeQuietly(sensors, "sensors", firstException);
-Utils.closeQuietly(channelBuilder, "channelBuilder", 
firstException);
-Throwable exception = firstException.get();
-if (exception instanceof RuntimeException && !(exception 
instanceof SecurityException)) {
-throw (RuntimeException) exception;
-}
-
+AtomicReference firstException = new AtomicReference<>();

Review comment:
   Have we considered using `Utils.closeAll` instead of multiple 
`closeQuietly`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #8687: MINOR: updated MacOS compatibility statement for RocksDB

2020-05-18 Thread GitBox


mjsax merged pull request #8687:
URL: https://github.com/apache/kafka/pull/8687


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10016) Support For Purge Topic

2020-05-18 Thread David Mollitor (Jira)
David Mollitor created KAFKA-10016:
--

 Summary: Support For Purge Topic
 Key: KAFKA-10016
 URL: https://issues.apache.org/jira/browse/KAFKA-10016
 Project: Kafka
  Issue Type: Improvement
Reporter: David Mollitor


Some discussions about how to purge a topic.  Please add native support  for 
this operation.  Is there a "starting offset" for each topic?  Such a vehicle 
would allow for this value to be easily set with the current offeset and the 
brokers will skip (and clean) everything before that.

 

[https://stackoverflow.com/questions/16284399/purge-kafka-topic]

 
{code:none}
kafka-topics --topic mytopic --purge
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state

2020-05-18 Thread GitBox


hachikuji commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r426867775



##
File path: clients/src/main/resources/common/message/ListGroupsRequest.json
##
@@ -20,8 +20,14 @@
   // Version 1 and 2 are the same as version 0.
   //
   // Version 3 is the first flexible version.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds the States flexible field (KIP-518).
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
+{ "name": "States", "type": "[]string", "versions": "4+", "tag": 0, 
"taggedVersions": "4+",

Review comment:
   Sorry I missed this from the discussion, but why are we bumping the 
version if we are only adding tagged fields? Is it so that we can detect 
whether the capability is supported? If so, then I wonder why we don't make 
this a regular field.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##
@@ -26,4 +31,34 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends 
AbstractOptions {
+
+private Optional> states = Optional.empty();
+
+/**
+ * Only groups in these states will be returned by listConsumerGroups()

Review comment:
   Probably worth adding a comment about broker compatibility with this API.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##
@@ -26,4 +31,34 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends 
AbstractOptions {
+
+private Optional> states = Optional.empty();
+
+/**
+ * Only groups in these states will be returned by listConsumerGroups()
+ * If not set, all groups are returned without their states
+ * throw IllegalArgumentException if states is empty
+ */
+public ListConsumerGroupsOptions inStates(Set states) {
+if (states == null || states.isEmpty()) {
+throw new IllegalArgumentException("states should not be null or 
empty");
+}
+this.states = Optional.of(states);
+return this;
+}
+
+/**
+ * All groups with their states will be returned by listConsumerGroups()
+ */
+public ListConsumerGroupsOptions inAnyState() {
+this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class));

Review comment:
   Hmm.. We have an `UNKNOWN` state in `ConsumerGroupState` in case the 
group coordinator adds a new state that the client isn't aware of. Currently 
we're going to pass this through the request, which is a bit odd. Furthermore, 
if the coordinator _does_ add new states, we will be unable to see them using 
this API. I think it might be better to use a `null` list of states in the 
request to indicate that any state is needed.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1397,29 +1398,32 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
-val (error, groups) = groupCoordinator.handleListGroups()
+val listGroupsRequest = request.body[ListGroupsRequest]
+val states = listGroupsRequest.data.states.asScala.toList
+
+def createResponse(throttleMs: Int, groups: List[GroupOverview], error: 
Errors): AbstractResponse = {
+   new ListGroupsResponse(new ListGroupsResponseData()
+.setErrorCode(error.code)
+.setGroups(groups.map { group =>
+val listedGroup = new ListGroupsResponseData.ListedGroup()
+  .setGroupId(group.groupId)
+  .setProtocolType(group.protocolType)
+if (!states.isEmpty)

Review comment:
   Why don't we always return the state? I don't think overhead is a huge 
concern for an api like this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8680: KAFKA-9755: Implement read path for feature versioning system (KIP-584)

2020-05-18 Thread GitBox


abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r426806321



##
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param  is the type of version range.
+ */
+public class Features {
+private final Map features;
+
+/**
+ * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+ * static factory functions for instantiation (see below).
+ *
+ * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+ *   for the Features object.
+ */
+private Features(Map features) {
+this.features = features;
+}
+
+/**
+ * @param features   Map of feature name to VersionRange, as the backing 
data structure
+ *   for the Features object.
+ * @return   Returns a new Features object representing 
"supported" features.
+ */
+public static Features supportedFeatures(Map features) {
+return new Features(features);
+}
+
+/**
+ * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+ *   for the Features object.
+ * @return   Returns a new Features object representing 
"finalized" features.
+ */
+public static Features finalizedFeatures(Map features) {
+return new Features(features);
+}
+
+public static Features emptyFinalizedFeatures() {

Review comment:
   Is this function only used in unit test?

##
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param  is the type of version range.
+ */
+public class Features {
+private final Map features;
+
+/**
+ * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+ * static factory functions for instantiation (see below).
+ *
+ * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+ *   for the Features object.
+ */
+private Features(Map features) {
+this.features = features;
+}
+
+/**
+ * @param features   Map of feature name to VersionRange, as the backing 
data structure
+ *   for the Features object.
+ * @return   Returns a new Features object representing 
"supported" features.
+ */
+public static Features supportedFeatures(Map features) {
+return new Features(features);

Review comment:
   Could be simplified as new Features<>

##
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.

Review comment:
   nit: we could use {@link VersionRangeType} to reference to the classes.

##
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+

[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2020-05-18 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110609#comment-17110609
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-4327:
-

[~mjsax] [~guozhang] I'd like to help closing this one as things have changed 
since it got created:
 * zookeeper dependency has been removed and
 * zookeeper argument deprecated.

This tool carries a dependency to an argument parser that I'm not sure we would 
like to pull into streams module.

I'd like to propose and agree in the following changes before moving forward:
 * move StreamsResetter to `tools` module
 * translate jopt parser (scala) into argparser (java)
 * remove zookeeper parameter

If we agree on this, I can draft a small KIP to get this done.

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

2020-05-18 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10017:
---

 Summary: Flaky Test 
EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
 Key: KAFKA-10017
 URL: https://issues.apache.org/jira/browse/KAFKA-10017
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman


Creating a new ticket for this since the root cause is different than 
https://issues.apache.org/jira/browse/KAFKA-9966
h3. Stacktrace

java.lang.AssertionError: Did not receive all 20 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <20> but: <15> was less than <20> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #8569: KIP-551: Expose disk read and write metrics

2020-05-18 Thread GitBox


cmccabe merged pull request #8569:
URL: https://github.com/apache/kafka/pull/8569


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9292) KIP-551: Expose disk read and write metrics

2020-05-18 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-9292.
-
Fix Version/s: 2.6.0
   Resolution: Fixed

> KIP-551: Expose disk read and write metrics
> ---
>
> Key: KAFKA-9292
> URL: https://issues.apache.org/jira/browse/KAFKA-9292
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 2.6.0
>
>
> It's often helpful to know how many bytes Kafka is reading and writing from 
> the disk.  The reason is because when disk access is required, there may be 
> some impact on latency and bandwidth.  We currently don't have a metric that 
> measures this directly.  It would be useful to add one.
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-551%3A+Expose+disk+read+and+write+metrics



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] navina commented on pull request #8684: KAFKA-10012 Reducing memory overhead associated with strings in Metri…

2020-05-18 Thread GitBox


navina commented on pull request #8684:
URL: https://github.com/apache/kafka/pull/8684#issuecomment-630444877


   @ijuma What issues with intern are you referring to? I know that there can 
be a performance hit when there are a lot of intern strings. I believe string 
interning mechanism has been improved in the later versions of java such as 
jdk8 / 9. 
   I would like to understand the concern better before removing the string 
interning showed here.
   Thanks for the quick feedback! 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

2020-05-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110633#comment-17110633
 ] 

Sophie Blee-Goldman commented on KAFKA-10017:
-

h3. Stacktrace

java.lang.AssertionError: Did not receive all 10 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <10> but: <5> was less than <10> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)

> Flaky Test 
> EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
> -
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] navina edited a comment on pull request #8684: KAFKA-10012 Reducing memory overhead associated with strings in Metri…

2020-05-18 Thread GitBox


navina edited a comment on pull request #8684:
URL: https://github.com/apache/kafka/pull/8684#issuecomment-630444877


   @ijuma What issues with `intern()` are you referring to? I know that there 
can be a performance hit when there are a lot of intern strings. I believe 
string interning mechanism has been improved in the later versions of java such 
as jdk8 / 9. 
   I would like to understand the concern better before removing the string 
interning showed here.
   Thanks for the quick feedback! 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll

2020-05-18 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110636#comment-17110636
 ] 

Boyang Chen commented on KAFKA-10010:
-

Had offline discussion with the team, so far some action items:


 # Make the state store registration idempotent to unblock the trunk soak
 # Add a logic to avoid aborting the txn when the task is in initialization 
phase (Get a separate ticket)

 

> Should close standby task for safety during HandleLostAll
> -
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-05-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10017:

Summary: Flaky Test 
EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta  (was: Flaky 
Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true])

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-05-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110633#comment-17110633
 ] 

Sophie Blee-Goldman edited comment on KAFKA-10017 at 5/18/20, 9:36 PM:
---

With injectError = false:
h3. Stacktrace

java.lang.AssertionError: Did not receive all 10 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <10> but: <5> was less than <10> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)


was (Author: ableegoldman):
h3. Stacktrace

java.lang.AssertionError: Did not receive all 10 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <10> but: <5> was less than <10> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-05-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10017:

Description: 
Creating a new ticket for this since the root cause is different than 
https://issues.apache.org/jira/browse/KAFKA-9966

With injectError = true:
h3. Stacktrace

java.lang.AssertionError: Did not receive all 20 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <20> but: <15> was less than <20> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)

  was:
Creating a new ticket for this since the root cause is different than 
https://issues.apache.org/jira/browse/KAFKA-9966
h3. Stacktrace

java.lang.AssertionError: Did not receive all 20 records from topic 
multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <20> but: <15> was less than <20> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
 at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)


> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005

[GitHub] [kafka] ableegoldman commented on pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


ableegoldman commented on pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#issuecomment-630448469


   Failed due to flaky 
`EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta` and 
`ConnectorTopicsIntegrationTest.testGetActiveTopics`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10010) Should close standby task for safety during HandleLostAll

2020-05-18 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110638#comment-17110638
 ] 

Boyang Chen commented on KAFKA-10010:
-

For more context, the 
[reason|[https://github.com/apache/kafka/pull/8440/files#r407722022]] we have 
to keep the txn commit before handle task corruption, since otherwise under EOS 
beta the stream thread could actually abort other healthy tasks.

> Should close standby task for safety during HandleLostAll
> -
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #8669: MINOR: consolidate processor context for active/standby

2020-05-18 Thread GitBox


guozhangwang merged pull request #8669:
URL: https://github.com/apache/kafka/pull/8669


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2020-05-18 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110643#comment-17110643
 ] 

Bill Bejeck commented on KAFKA-7271:


[~mjsax] I don't think I'll have time for this one, so I've unassigned myself.

> Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
> ---
>
> Key: KAFKA-7271
> URL: https://issues.apache.org/jira/browse/KAFKA-7271
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2020-05-18 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-7271:
--

Assignee: (was: Bill Bejeck)

> Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
> ---
>
> Key: KAFKA-7271
> URL: https://issues.apache.org/jira/browse/KAFKA-7271
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna opened a new pull request #8689: KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173

2020-05-18 Thread GitBox


cadonna opened a new pull request #8689:
URL: https://github.com/apache/kafka/pull/8689


   Unit tests
   - 
shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks()
   - 
shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThreadsButNotOverClients()
   - 
shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTaskOverClients()
   verify that bug KAFKA-9173 is fixed with the new 
HighAvailabilityTaskAssignor.
   
   
shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks()
   ensures that tasks are evenly assigned over clients when all overprovisioned 
clients join
   simultaneously.
   
   
shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThreadsButNotOverClients()
   ensures that warm-up tasks are assigned to two new clients that join the 
group
   although the assignment is already balanced over stream threads.
   
   
shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTaskOverClients()
   ensures that stateful active tasks are balanced over previous and warmed-up 
client
   although it the previous assignment is balanced over stream threads.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #8689: KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173

2020-05-18 Thread GitBox


cadonna commented on pull request #8689:
URL: https://github.com/apache/kafka/pull/8689#issuecomment-630467234


   Call for review: @vvcephei @ableegoldman 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8689: KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173

2020-05-18 Thread GitBox


cadonna commented on a change in pull request #8689:
URL: https://github.com/apache/kafka/pull/8689#discussion_r426924669



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -170,7 +171,7 @@ public void 
shouldAssignActiveStatefulTasksEvenlyOverUnevenlyDistributedStreamTh
 }
 
 @Test
-public void 
shouldAssignActiveStatefulTasksEvenlyOverClientsWithLessClientsThanTasks() {
+public void 
shouldAssignActiveStatefulTasksEvenlyOverClientsWithMoreClientsThanTasks() {

Review comment:
   This name seemed not correct.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10010) Should make state store registration idempotent

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10010:

Summary: Should make state store registration idempotent  (was: Should 
close standby task for safety during HandleLostAll)

> Should make state store registration idempotent
> ---
>
> Key: KAFKA-10010
> URL: https://issues.apache.org/jira/browse/KAFKA-10010
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-07 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9989:
---
Description: 
System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and wait for the finalized assignment before kicking off the 
record processing validation. 

  was:
System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and skip the record processing validation when the assignment is 
empty. 


> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment before kicking off the 
> record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9989:
--

Assignee: (was: Boyang Chen)

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment before kicking off the 
> record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9989:
---
Labels: newbie  (was: )

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment before kicking off the 
> record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9989:
---
Description: 
System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and wait for the finalized assignment (non-empty) before kicking 
off the record processing validation. 

  was:
System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and wait for the finalized assignment before kicking off the 
record processing validation. 


> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and wait for the finalized assignment (non-empty) before 
> kicking off the record processing validation. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe opened a new pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner

2020-05-18 Thread GitBox


cmccabe opened a new pull request #8690:
URL: https://github.com/apache/kafka/pull/8690


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu opened a new pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-18 Thread GitBox


xiaodongdu opened a new pull request #8691:
URL: https://github.com/apache/kafka/pull/8691


   Implement KIP-606, add metadata context to MetricsReporter:
   Added a new api to MetricsReporter to allow client to expose additional 
metadata fields to reporter plugin. Added an interface MetricsContext to 
encapsulate metadata.
   Deprecated JmexReporter(String prefix) constructor. The prefix will be 
passed to the reporter via MetricsContext.
   Replaced existing usage of JmxReporter with the default ImxReporter and pass 
JMX prefix to MetricsContext using _namespace as key.
   From Kafka broker, populate MetricsContext with: kafka.cluster.id and 
kafka.nroker.id
   From Connect, populate MetricsContext with: connect.kafka.cluster.id, 
connect.group.id
   
   Committer Checklist (excluded from commit message)
Verify design and implementation
Verify test coverage and CI build status
Verify documentation (including upgrade notes)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >