[GitHub] [kafka] lkokhreidze commented on pull request #8558: KAFKA-8611 / KIP-221 documentation

2020-05-13 Thread GitBox


lkokhreidze commented on pull request #8558:
URL: https://github.com/apache/kafka/pull/8558#issuecomment-628419029


   Hi @mjsax 
   
   I've rebased the branch.
   Do you mean comment by Guozhang in the voting thread? If not, I missed it 
and can't find anything new in the DISCUSS thread. Can you point me where was 
it asked?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator

2020-05-13 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9676.

Resolution: Fixed

The current unit test coverage is pretty good now, closing the ticket.

> Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
> --
>
> Key: KAFKA-9676
> URL: https://issues.apache.org/jira/browse/KAFKA-9676
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted, newbie
>
> The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit 
> test coverage. We should add corresponding tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-13 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106950#comment-17106950
 ] 

Boyang Chen commented on KAFKA-9989:


I didn't find that in the log [~ableegoldman]

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and skip the record processing validation when the assignment 
> is empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #8395: Added doc for KIP-535 and updated it for KIP-562

2020-05-13 Thread GitBox


abbccdda commented on a change in pull request #8395:
URL: https://github.com/apache/kafka/pull/8395#discussion_r404455662



##
File path: docs/upgrade.html
##
@@ -39,7 +39,8 @@ Notable changes in 2
 https://github.com/apache/kafka/tree/2.5/examples";>examples folder. 
Check out
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics";>KIP-447
 for the full details.
-Deprecated KafkaStreams.store(String, QueryableStoreType) 
and replaced it with KafkaStreams.store(StoreQueryParameters).
+Provided support to query stale stores(for high availability) and the 
stores belonging to a specific partition by deprecating 
KafkaStreams.store(String, QueryableStoreType) and replacing it 
with KafkaStreams.store(StoreQueryParameters).

Review comment:
   
![image](https://user-images.githubusercontent.com/5845561/78615480-809bb580-7826-11ea-9f59-2e7c3cf1a901.png)
   Let's add a space between `stores(` as `stores (`, and add a period after 
`KafkaStreams.allLocalStorePartitionLags()`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao opened a new pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty

2020-05-13 Thread GitBox


zhaohaidao opened a new pull request #8665:
URL: https://github.com/apache/kafka/pull/8665


   Ticket: KAFKA-9984
   It indicates a configuration error when consumer subscribes an empty pattern:
   ```
   
   [Consumer ...  ] Subscribed to pattern:  ''
   
   ```
   The `consumer.subscribe(pattern)` call should fail with illegal argument for 
this case.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8623: MINOR: Update the documentations

2020-05-13 Thread GitBox


showuon commented on pull request #8623:
URL: https://github.com/apache/kafka/pull/8623#issuecomment-628403219


   Hi @kkonstantine , could you please review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#discussion_r424712060



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##
@@ -105,6 +109,7 @@ public static String getTaskProducerClientId(final String 
threadClientId, final
 endOffsets = future.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
 }
 } catch (final TimeoutException | RuntimeException | 
InterruptedException | ExecutionException e) {
+LOG.warn("listOffsets request failed due to ", e);

Review comment:
   ```suggestion
   LOG.warn("listOffsets request failed.", e);
   ```
   
   Thanks! (minor suggestion to make the log message more typical)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -169,6 +173,9 @@ public void makeReady(final Map topics) {
 log.error(timeoutAndRetryError);
 throw new StreamsException(timeoutAndRetryError);
 }
+log.debug("Completed validating internal topics and created {}", 
newlyCreatedTopics);

Review comment:
   It's not what you signed up for, but I'm wondering if we should at least 
submit a Jira to give some of these AdminClient methods a "full consistency" 
mode. In other words, since the command returns a future anyway, it would be 
nice to be able to tell it not to return until it can guarantee the topic will 
appear to be fully created on all brokers.
   
   I'm mildly concerned that we're just kicking the can down the road a little 
ways with this PR. I.e., we let the assignment happen, but then some other 
metadata (or data) operation for that topic will just fail shortly thereafter.
   
   More generally, we jump through a lot of hoops in our own tests to try and 
make sure that the topics are really, actually created (or deleted) before 
proceeding with the test, and I'm sure that our users also suffer from the same 
problem in their testing and production code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ

2020-05-13 Thread GitBox


chia7712 commented on a change in pull request #8663:
URL: https://github.com/apache/kafka/pull/8663#discussion_r424860254



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
##
@@ -96,6 +101,25 @@ public static void validate(Map props) {
 throw new ConfigException("Must configure one of " +
 SinkTask.TOPICS_CONFIG + " or " + 
SinkTask.TOPICS_REGEX_CONFIG);
 }
+
+if (hasDlqTopicConfig) {
+String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim();
+if (hasTopicsConfig) {
+List topics = parseTopicsList(props);
+if (topics.contains(dlqTopic)) {
+throw new ConfigException(DLQ_TOPIC_NAME_CONFIG + " has a 
topic name which is already in " +

Review comment:
   Should we log the topic name for this exception? For example, ```has a 
topic name (xxx) which```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2020-05-13 Thread Andre Price (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106865#comment-17106865
 ] 

Andre Price commented on KAFKA-9981:


Not sure if related but I think I'm seeing issues where task configs don't seem 
to be getting updated consistently when new topics/partitions are found. 

 

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ

2020-05-13 Thread GitBox


chia7712 commented on a change in pull request #8663:
URL: https://github.com/apache/kafka/pull/8663#discussion_r424859533



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
##
@@ -108,6 +132,20 @@ public static boolean hasTopicsRegexConfig(Map props) {
 return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
 }
 
+public static boolean hasDlqTopicConfig(Map props) {
+String dqlTopicStr = props.get(DLQ_TOPIC_NAME_CONFIG);
+return dqlTopicStr != null && !dqlTopicStr.trim().isEmpty();
+}
+
+public static List parseTopicsList(Map props) {
+List topics = (List) 
ConfigDef.parseType(TOPICS_CONFIG, props.get(TOPICS_CONFIG), Type.LIST);
+return topics
+.stream()
+.filter(topic -> !topic.isEmpty())
+.distinct()

Review comment:
   How about returning a Set instead of List? 
   
   ```
   return topics
   .stream()
   .filter(topic -> !topic.isEmpty())
   .collect(Collectors.toSet());
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-13 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106864#comment-17106864
 ] 

Sophie Blee-Goldman commented on KAFKA-9989:


Yeah that looks like the FallbackPriorTaskAssignor to me. Do you see something 
like 
{code:java}
Failed to fetch end offsets for changelogs, will return previous assignment to 
clients and " + "trigger another rebalance to retry.
{code}
in the logs? Depending on when this test was run this specific log message may 
or may not have existed, we've made a lot of changes here lately..

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and skip the record processing validation when the assignment 
> is empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ

2020-05-13 Thread GitBox


chia7712 commented on a change in pull request #8663:
URL: https://github.com/apache/kafka/pull/8663#discussion_r424859533



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
##
@@ -108,6 +132,20 @@ public static boolean hasTopicsRegexConfig(Map props) {
 return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
 }
 
+public static boolean hasDlqTopicConfig(Map props) {
+String dqlTopicStr = props.get(DLQ_TOPIC_NAME_CONFIG);
+return dqlTopicStr != null && !dqlTopicStr.trim().isEmpty();
+}
+
+public static List parseTopicsList(Map props) {
+List topics = (List) 
ConfigDef.parseType(TOPICS_CONFIG, props.get(TOPICS_CONFIG), Type.LIST);
+return topics
+.stream()
+.filter(topic -> !topic.isEmpty())
+.distinct()

Review comment:
   why not just return a Set instead of List? 
   
   ```
   return topics
   .stream()
   .filter(topic -> !topic.isEmpty())
   .collect(Collectors.toSet());
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9984) Should fail the subscription when pattern is empty

2020-05-13 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao reassigned KAFKA-9984:
--

Assignee: HaiyuanZhao

> Should fail the subscription when pattern is empty
> --
>
> Key: KAFKA-9984
> URL: https://issues.apache.org/jira/browse/KAFKA-9984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
>
> We have seen a case where the consumer subscribes to an empty string pattern:
> ```
> [Consumer ...  ] Subscribed to pattern:  ''
> ```
> which doesn't make any sense and usually indicate a configuration error. The 
> `consumer.subscribe(pattern)` call should fail with illegal argument for this 
> case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9984) Should fail the subscription when pattern is empty

2020-05-13 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106857#comment-17106857
 ] 

Boyang Chen commented on KAFKA-9984:


[~guozhang] Thanks!

> Should fail the subscription when pattern is empty
> --
>
> Key: KAFKA-9984
> URL: https://issues.apache.org/jira/browse/KAFKA-9984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Boyang Chen
>Priority: Major
>
> We have seen a case where the consumer subscribes to an empty string pattern:
> ```
> [Consumer ...  ] Subscribed to pattern:  ''
> ```
> which doesn't make any sense and usually indicate a configuration error. The 
> `consumer.subscribe(pattern)` call should fail with illegal argument for this 
> case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-13 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106853#comment-17106853
 ] 

Boyang Chen commented on KAFKA-9989:


Looks like it is determined by the assignor: (the latest added consumer gets 0 
partition assigned)

[2020-05-07 07:53:15,487] DEBUG stream-thread 
[StreamsUpgradeTest-0626d80d-03ed-4cf9-95a2-d778a791394f-StreamThread-1-consumer]
 Assigning tasks [0_0, 0_1, 0_2, 0_3, 0_4] to clients 
\{0626d80d-03ed-4cf9-95a2-d778a791394f=[activeTasks: ([]) standbyTasks: ([]) 
assignedTasks: ([]) prevActiveTasks: ([0_1, 0_3]) prevStandbyTasks: ([]) 
prevAssignedTasks: ([0_1, 0_3]) prevOwnedPartitionsByConsumerId: ([data-1, 
data-3]) changelogOffsetTotalsByTask: ([]) capacity: 1], 
3405b9f3-7f28-454a-9a00-906f8c1b520f=[activeTasks: ([]) standbyTasks: ([]) 
assignedTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) 
prevAssignedTasks: ([]) prevOwnedPartitionsByConsumerId: ([]) 
changelogOffsetTotalsByTask: ([]) capacity: 1], 
8611ff87-6782-4c27-b93e-c145b0bacccd=[activeTasks: ([]) standbyTasks: ([]) 
assignedTasks: ([]) prevActiveTasks: ([0_0, 0_2, 0_4]) prevStandbyTasks: ([]) 
prevAssignedTasks: ([0_0, 0_2, 0_4]) prevOwnedPartitionsByConsumerId: ([data-0, 
data-2, data-4]) changelogOffsetTotalsByTask: ([]) capacity: 1]} with number of 
replicas 0 
(org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor)

[2020-05-07 07:53:15,487] DEBUG [AdminClient 
clientId=StreamsUpgradeTest-0626d80d-03ed-4cf9-95a2-d778a791394f-admin] Sending 
MetadataRequestData(topics=[], allowAutoTopicCreation=false, 
includeClusterAuthorizedOperations=false, 
includeTopicAuthorizedOperations=false) to worker2:9092 (id: 1 rack: null). 
correlationId=7 (org.apache.kafka.clients.admin.KafkaAdminClient)

[2020-05-07 07:53:15,487] INFO stream-thread 
[StreamsUpgradeTest-0626d80d-03ed-4cf9-95a2-d778a791394f-StreamThread-1-consumer]
 Assigned tasks to clients as

0626d80d-03ed-4cf9-95a2-d778a791394f=[activeTasks: ([0_1, 0_3]) standbyTasks: 
([]) assignedTasks: ([0_1, 0_3]) prevActiveTasks: ([0_1, 0_3]) 
prevStandbyTasks: ([]) prevAssignedTasks: ([0_1, 0_3]) 
prevOwnedPartitionsByConsumerId: ([data-1, data-3]) 
changelogOffsetTotalsByTask: ([]) capacity: 1]

3405b9f3-7f28-454a-9a00-906f8c1b520f=[activeTasks: ([]) standbyTasks: ([]) 
assignedTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) 
prevAssignedTasks: ([]) prevOwnedPartitionsByConsumerId: ([]) 
changelogOffsetTotalsByTask: ([]) capacity: 1]

8611ff87-6782-4c27-b93e-c145b0bacccd=[activeTasks: ([0_0, 0_2, 0_4]) 
standbyTasks: ([]) assignedTasks: ([0_0, 0_2, 0_4]) prevActiveTasks: ([0_0, 
0_2, 0_4]) prevStandbyTasks: ([]) prevAssignedTasks: ([0_0, 0_2, 0_4]) 
prevOwnedPartitionsByConsumerId: ([data-0, data-2, data-4]) 
changelogOffsetTotalsByTask: ([]) capacity: 1]. 
(org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor)

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and skip the record processing validation when the assignment 
> is empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9617) Replica Fetcher can mark partition as failed when max.message.bytes is changed

2020-05-13 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106843#comment-17106843
 ] 

Chia-Ping Tsai commented on KAFKA-9617:
---

[https://github.com/apache/kafka/pull/8659]

> Replica Fetcher can mark partition as failed when max.message.bytes is changed
> --
>
> Key: KAFKA-9617
> URL: https://issues.apache.org/jira/browse/KAFKA-9617
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: newbie
>
> There exists a race condition when changing the dynamic max.message.bytes 
> config for a topic. A follower replica can replicate a message that is over 
> that size after it processes the config change. When this happens, the 
> replica fetcher catches the unexpected exception, marks the partition as 
> failed and stops replicating it.
> {code:java}
> 06:38:46.596  Processing override for entityPath: topics/partition-1 with 
> config: Map(max.message.bytes -> 512)
> 06:38:46.597   [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] 
> Unexpected error occurred while processing data for partition partition-1 at 
> offset 20964
> org.apache.kafka.common.errors.RecordTooLargeException: The record batch size 
> in the append to partition-1 is 3349 bytes which exceeds the maximum 
> configured value of 512.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-05-13 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106832#comment-17106832
 ] 

Sophie Blee-Goldman commented on KAFKA-9987:


Pseudo-code sketch of the algorithm:
{{C_f := (P/N)_floor, the floor capacity}}
{{C_c := (P/N)_ceil, the ceiling capacity}}
 
{{members := the sorted set of all consumers}}
{{partitions := the set of all partitions}}
{{unassigned_partitions := the set of partitions not yet assigned, initialized 
to be all partitions}}
{{unfilled_members := the set of consumers not yet at capacity, initialized to 
empty}}
{{max_capacity_members := the set of members with exactly C_c partitions 
assigned, initialized to empty}}
{{member.owned_partitions := the set of previously owned partitions encoded in 
the Subscription}}
 
{{// Reassign as many previously owned partitions as possible}}
{{for member : members}}
{{}}{{remove any partitions that are no longer in the subscription from 
its owned partitions}}
{{}}{{remove all owned_partitions if the generation is old}}
{{}}{{if member.owned_partitions.size < C_f}}
{{}}{{assign all owned partitions to member and remove from 
unassigned_partitions}}
{{}}{{add member to unfilled_members}}
{{}}{{else if member.owned_partitions.size == C_f}}
{{}}{{assign first C_f owned_partitions to member and remove from 
unassigned_partitions}}
{{}}{{else}}
{{}}{{assign first C_c owned_partitions to member and remove from 
unassigned_partitions}}
{{}}{{add member to max_capacity_members}}
 
{{sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
t1_p0  (for data parallelism)}}
{{sort unfilled_members by memberId (for determinism)}}
 
{{// Fill remaining members up to C_f}}
{{for member : unfilled_members}}
{{}}{{compute the remaining capacity as C = C_f - num_assigned_partitions}}
{{}}{{pop the first C partitions from unassigned_partitions and assign to 
member}}
 
{{// Steal partitions from members with max_capacity if necessary}}
{{if we run out of partitions before getting to the end of unfilled members:}}
{{}}{{for member : unfilled_members}}
{{}}{{poll for first member in max_capacity_members and remove one 
partition}}
{{}}{{assign this partition to the unfilled member}}
{{}} 
{{// Distribute remaining partitions, one per consumer, to fill some up to C_c 
if necessary}}
{{if we run out of unfilled_members before assigning all partitions:}}
{{}}{{for partition : unassigned_partitions}}
{{}}{{assign to next member in members that is not in 
max_capacity_members (then add member to max_capacity_members)}}

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda closed pull request #7172: (DO NOT MERGE) Kip 447 condensed POC

2020-05-13 Thread GitBox


abbccdda closed pull request #7172:
URL: https://github.com/apache/kafka/pull/7172


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda closed pull request #7676: DEBUG: add partition empty check

2020-05-13 Thread GitBox


abbccdda closed pull request #7676:
URL: https://github.com/apache/kafka/pull/7676


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda closed pull request #7179: MINOR: improve StreamsBrokerDownResilienceTest debuggability

2020-05-13 Thread GitBox


abbccdda closed pull request #7179:
URL: https://github.com/apache/kafka/pull/7179


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424830762



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
 private final PriorityQueue clientsByTaskLoad;
-private final BiFunction validClientCriteria;
+private final BiFunction constraint;
 private final Set uniqueClients = new HashSet<>();
 
-ValidClientsByTaskLoadQueue(final Map clientStates,
-final BiFunction 
validClientCriteria) {
-this.validClientCriteria = validClientCriteria;
-
-clientsByTaskLoad = new PriorityQueue<>(
-(client, other) -> {
-final double clientTaskLoad = 
clientStates.get(client).taskLoad();
-final double otherTaskLoad = 
clientStates.get(other).taskLoad();
-if (clientTaskLoad < otherTaskLoad) {
-return -1;
-} else if (clientTaskLoad > otherTaskLoad) {
-return 1;
-} else {
-return client.compareTo(other);
-}
-});
+ConstrainedPrioritySet(final BiFunction constraint,
+   final Function weight) {
+this.constraint = constraint;
+clientsByTaskLoad = new 
PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> 
clientId));
 }
 
 /**
  * @return the next least loaded client that satisfies the given criteria, 
or null if none do
  */
-UUID poll(final TaskId task) {
-final List validClient = poll(task, 1);
-return validClient.isEmpty() ? null : validClient.get(0);
-}
-
-/**
- * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid candidates for the given task
- */
-List poll(final TaskId task, final int numClients) {
-final List nextLeastLoadedValidClients = new LinkedList<>();
+UUID poll(final TaskId task, final Function 
extraConstraint) {

Review comment:
   > we know that we cannot consider C1 again for the second poll
   
   Yep, that's what I was getting at above. I'm totally on board with reducing 
the number of assumptions, especially as this class becomes more generally 
used. I was just intrigued by what you said initially and thought "This 
actually results in better balancing characteristics when assigning standbys" 
meant that you had actually seen a difference in the tests.
   
   Thanks for continuing to improve this class!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #8528: System tests should use --bootstrap-server rather than --zookeeper when testing new Kafka versions

2020-05-13 Thread GitBox


d8tltanc commented on pull request #8528:
URL: https://github.com/apache/kafka/pull/8528#issuecomment-628340161


   Latest dev branch builds here (REMOVE_SYSTEM_4): 
   
   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3943/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424825641



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
 private final PriorityQueue clientsByTaskLoad;
-private final BiFunction validClientCriteria;
+private final BiFunction constraint;
 private final Set uniqueClients = new HashSet<>();
 
-ValidClientsByTaskLoadQueue(final Map clientStates,
-final BiFunction 
validClientCriteria) {
-this.validClientCriteria = validClientCriteria;
-
-clientsByTaskLoad = new PriorityQueue<>(
-(client, other) -> {
-final double clientTaskLoad = 
clientStates.get(client).taskLoad();
-final double otherTaskLoad = 
clientStates.get(other).taskLoad();
-if (clientTaskLoad < otherTaskLoad) {
-return -1;
-} else if (clientTaskLoad > otherTaskLoad) {
-return 1;
-} else {
-return client.compareTo(other);
-}
-});
+ConstrainedPrioritySet(final BiFunction constraint,
+   final Function weight) {
+this.constraint = constraint;
+clientsByTaskLoad = new 
PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> 
clientId));
 }
 
 /**
  * @return the next least loaded client that satisfies the given criteria, 
or null if none do
  */
-UUID poll(final TaskId task) {
-final List validClient = poll(task, 1);
-return validClient.isEmpty() ? null : validClient.get(0);
-}
-
-/**
- * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid candidates for the given task
- */
-List poll(final TaskId task, final int numClients) {
-final List nextLeastLoadedValidClients = new LinkedList<>();
+UUID poll(final TaskId task, final Function 
extraConstraint) {

Review comment:
   I was operating more on intuition here. To be honest, I had a suspicion 
you would call this out, so I probably should have just saved time and taken 
the time to prove it.
   
   Forgetting about the constraint for a minute, I think that what I had in 
mind for balance is something like, suppose you have two clients "C1" and 
"C2"... C1 has one task and C2 has two. You poll and get C1 and add a task. 
Now, they both have two.
   
   If you add it back and poll again, you might prefer to get C1 back again. 
Maybe because the "weight" function takes into account more than just the task 
load, or maybe just because of the total order we impose based on clientId, in 
which `C1 < C2`. But if you just poll two clients to begin with, then C1 
doesn't get a chance to be included for the second poll, you just automatically 
get C1 and C2.
   
   In retrospect, this might be moot in practice, because the only time we 
actually polled for multiple clients was when assigning standbys, and 
specifically when we were assigning multiple replicas of the same task, in 
which case, we know that we _cannot_ consider C1 again for the second poll.
   
   From a computer-sciencey perspective, it doesn't seem like the data 
structure should be able to make this assumption, though, since it can't know 
that polling a client also invalidates it for a subsequent poll with the same 
last-mile predicate.
   
   So, even in retrospect, I'm tempted to leave it this way (big surprise 
there), although I'd acknowledge that the outcome is actually not different in 
the way that we would use the method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9991) Flaky Test KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled

2020-05-13 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-9991:
--

 Summary: Flaky Test 
KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled
 Key: KAFKA-9991
 URL: https://issues.apache.org/jira/browse/KAFKA-9991
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6280/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled/]

 
h3. Stacktrace

java.lang.AssertionError: Condition not met within timeout 3. Table did not 
read all values at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) 
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205)
 at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(KTableSourceTopicRestartIntegrationTest.java:159)
 at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled(KTableSourceTopicRestartIntegrationTest.java:143)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9688) kafka-topic.sh should show KIP-455 adding and removing replicas

2020-05-13 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-9688.
-
Fix Version/s: 2.5.0
 Reviewer: Colin McCabe
   Resolution: Fixed

> kafka-topic.sh should show KIP-455 adding and removing replicas
> ---
>
> Key: KAFKA-9688
> URL: https://issues.apache.org/jira/browse/KAFKA-9688
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Colin McCabe
>Assignee: Cheng Tan
>Priority: Major
> Fix For: 2.5.0
>
>
> kafka-topic.sh should show KIP-455 adding and removing replicas, as described 
> in KIP-455.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424822166



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##
@@ -236,16 +233,17 @@ public void 
staticAssignmentShouldConvergeWithTheFirstAssignment() {
 0,
 1000L);
 
-final Harness harness = Harness.initializeCluster(1, 1, 1);
+final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
 
 testForConvergence(harness, configs, 1);
 verifyValidAssignment(0, harness);
+verifyBalancedAssignment(harness);
 }
 
 @Test
 public void assignmentShouldConvergeAfterAddingNode() {
-final int numStatelessTasks = 15;
-final int numStatefulTasks = 13;
+final int numStatelessTasks = 7;

Review comment:
   Right, but I think there's a "3" or a "5" in there somewhere, maybe in 
the other test. Anyway, my _intent_ was to make them all prime so I wouldn't 
have to think to hard about whether they were all coprime. But, in reality, I 
managed to screw up both.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424821641



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
##
@@ -35,262 +44,161 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasProperty;
 import static 
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
+import static org.hamcrest.Matchers.is;
 
 public class TaskMovementTest {
-private final ClientState client1 = new ClientState(1);
-private final ClientState client2 = new ClientState(1);
-private final ClientState client3 = new ClientState(1);
-
-private final Map clientStates = 
getClientStatesMap(client1, client2, client3);
-
-private final Map> emptyWarmupAssignment = mkMap(
-mkEntry(UUID_1, EMPTY_TASK_LIST),
-mkEntry(UUID_2, EMPTY_TASK_LIST),
-mkEntry(UUID_3, EMPTY_TASK_LIST)
-);
-
 @Test
 public void 
shouldAssignTasksToClientsAndReturnFalseWhenAllClientsCaughtUp() {
 final int maxWarmupReplicas = Integer.MAX_VALUE;
 final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, 
TASK_1_0, TASK_1_1, TASK_1_2);
 
-final Map> balancedAssignment = mkMap(
-mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
-mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)),
-mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2))
-);
-
 final Map> tasksToCaughtUpClients = new 
HashMap<>();
 for (final TaskId task : allTasks) {
 tasksToCaughtUpClients.put(task, mkSortedSet(UUID_1, UUID_2, 
UUID_3));
 }
-
-assertFalse(
+
+final ClientState client1 = 
getClientStateWithActiveAssignment(asList(TASK_0_0, TASK_1_0));
+final ClientState client2 = 
getClientStateWithActiveAssignment(asList(TASK_0_1, TASK_1_1));
+final ClientState client3 = 
getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
+
+assertThat(
 assignTaskMovements(
-balancedAssignment,
 tasksToCaughtUpClients,
-clientStates,
-getMapWithNumStandbys(allTasks, 1),
-maxWarmupReplicas)
+getClientStatesMap(client1, client2, client3),
+maxWarmupReplicas),
+is(false)
 );
-
-verifyClientStateAssignments(balancedAssignment, 
emptyWarmupAssignment);
 }
 
 @Test
 public void 
shouldAssignAllTasksToClientsAndReturnFalseIfNoClientsAreCaughtUp() {
-final int maxWarmupReplicas = 2;
-final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, 
TASK_1_0, TASK_1_1, TASK_1_2);
+final int maxWarmupReplicas = Integer.MAX_VALUE;
 
-final Map> balancedAssignment = mkMap(
-mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
-mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)),
-mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2))
-);
+final ClientState client1 = 
getClientStateWithActiveAssignment(asList(TASK_0_0, TASK_1_0));
+final ClientState client2 = 
getClientStateWithActiveAssignment(asList(TASK_0_1, TASK_1_1));
+final ClientState client3 = 
getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
 
-assertFalse(
+assertThat(
 assignTaskMovements(
-balancedAssignment,
 emptyMap(),
-clientStates,
-getMapWithNumStandbys(allTasks, 1),
-maxWarmupReplicas)
+getClientStatesMap(client1, client2, client3),
+maxWarmupReplicas),
+is(false)
 );
-verifyClientStateAssignments(balancedAssignment, 
emptyWarmupAssignment);
 }
 
 @Test
 public void 
shouldMoveTasksToCaughtUpClientsAndAssignWarmupReplicasInTheirPlace() {
 final int maxWarmupReplicas = Integer.MAX_VALUE;
-final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
+final ClientState client1 = 
getClientStateWithActiveAssignment(singletonList(TASK_0_

[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424820836



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -53,75 +67,94 @@ private static boolean 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
 /**
  * @return whether any warmup replicas were assigned
  */
-static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
+static boolean assignTaskMovements(final Map> 
tasksToCaughtUpClients,
final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
final int maxWarmupReplicas) {
-boolean warmupReplicasAssigned = false;
+final BiFunction caughtUpPredicate =
+(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients);
 
-final ValidClientsByTaskLoadQueue clientsByTaskLoad = new 
ValidClientsByTaskLoadQueue(
-clientStates,
-(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients)
+final ConstrainedPrioritySet clientsByTaskLoad = new 
ConstrainedPrioritySet(
+caughtUpPredicate,
+client -> clientStates.get(client).taskLoad()
 );
 
-final SortedSet taskMovements = new TreeSet<>(
-(movement, other) -> {
-final int numCaughtUpClients = movement.caughtUpClients.size();
-final int otherNumCaughtUpClients = 
other.caughtUpClients.size();
-if (numCaughtUpClients != otherNumCaughtUpClients) {
-return Integer.compare(numCaughtUpClients, 
otherNumCaughtUpClients);
-} else {
-return movement.task.compareTo(other.task);
-}
-}
+final Queue taskMovements = new PriorityQueue<>(
+
Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task)
 );
 
-for (final Map.Entry> assignmentEntry : 
statefulActiveTaskAssignment.entrySet()) {
-final UUID client = assignmentEntry.getKey();
-final ClientState state = clientStates.get(client);
-for (final TaskId task : assignmentEntry.getValue()) {
-if (taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, 
client, tasksToCaughtUpClients)) {
-state.assignActive(task);
-} else {
-final TaskMovement taskMovement = new TaskMovement(task, 
client, tasksToCaughtUpClients.get(task));
-taskMovements.add(taskMovement);
+for (final Map.Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID client = clientStateEntry.getKey();
+final ClientState state = clientStateEntry.getValue();
+for (final TaskId task : state.activeTasks()) {
+// if the desired client is not caught up, and there is 
another client that _is_ caught up, then
+// we schedule a movement, so we can move the active task to 
the caught-up client. We'll try to
+// assign a warm-up to the desired client so that we can move 
it later on.
+if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, 
client, tasksToCaughtUpClients)) {
+taskMovements.add(new TaskMovement(task, client, 
tasksToCaughtUpClients.get(task)));
 }
 }
 clientsByTaskLoad.offer(client);
 }
 
+final boolean movementsNeeded = !taskMovements.isEmpty();
+
 final AtomicInteger remainingWarmupReplicas = new 
AtomicInteger(maxWarmupReplicas);
 for (final TaskMovement movement : taskMovements) {
-final UUID sourceClient = clientsByTaskLoad.poll(movement.task);
-if (sourceClient == null) {
-throw new IllegalStateException("Tried to move task to 
caught-up client but none exist");
-}
-
-final ClientState sourceClientState = 
clientStates.get(sourceClient);
-sourceClientState.assignActive(movement.task);
-clientsByTaskLoad.offer(sourceClient);
+final UUID standbySourceClient = clientsByTaskLoad.poll(

Review comment:
   Agreed, it would just be good luck right now, but I figured we might as 
well capitalize on the luck. I'm planning to follow up pretty soon with the 
standby stickiness.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contac

[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424819838



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -53,75 +67,94 @@ private static boolean 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
 /**
  * @return whether any warmup replicas were assigned
  */
-static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
+static boolean assignTaskMovements(final Map> 
tasksToCaughtUpClients,
final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
final int maxWarmupReplicas) {
-boolean warmupReplicasAssigned = false;
+final BiFunction caughtUpPredicate =
+(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients);
 
-final ValidClientsByTaskLoadQueue clientsByTaskLoad = new 
ValidClientsByTaskLoadQueue(
-clientStates,
-(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients)
+final ConstrainedPrioritySet clientsByTaskLoad = new 
ConstrainedPrioritySet(

Review comment:
   sure, that's a good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-13 Thread GitBox


ableegoldman commented on pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#issuecomment-628329507


   Nooo 
`EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]` still 
had one failure on the Java 8 build 😭 
   
   But it failed on a different line which seems more in line with real 
flakiness. FWIW I ran this locally 40 times without failures (technically 80 in 
total for both true/false variations) ... I think it's worth still merging this 
PR and we can continue investigating it from there.
   
   ```
   java.lang.AssertionError: Did not receive all 20 records from topic 
multiPartitionOutputTopic within 6 ms
   Expected: is a value equal to or greater than <20>
but: <5> was less than <20>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #6792: KAFKA 8311 : Augment the error message to let user change `default.api.timeout.ms`

2020-05-13 Thread GitBox


mjsax commented on pull request #6792:
URL: https://github.com/apache/kafka/pull/6792#issuecomment-628323776


   @clearpal7 @abbccdda Just cleaning up my old backlog. Do you still want to 
get this in? PR would need a rebase.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #6617: MINOR: Allow scripts to be symlinked

2020-05-13 Thread GitBox


mjsax commented on pull request #6617:
URL: https://github.com/apache/kafka/pull/6617#issuecomment-628323063


   Closing this PR due to inactivity.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax closed pull request #6617: MINOR: Allow scripts to be symlinked

2020-05-13 Thread GitBox


mjsax closed pull request #6617:
URL: https://github.com/apache/kafka/pull/6617


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #6656: MINOR: Add option to rebuild source for system tests

2020-05-13 Thread GitBox


mjsax merged pull request #6656:
URL: https://github.com/apache/kafka/pull/6656


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-05-13 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9987:
---
Description: 
In 
[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 we added the new CooperativeStickyAssignor which leverages on the underlying 
sticky assignment algorithm of the existing StickyAssignor (moved to 
AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
optimize stickiness while satisfying perfect balance _in the case individual 
consumers may be subscribed to different subsets of the topics._ While it does 
a pretty good job at what it promises to do, it doesn't scale well with large 
numbers of consumers and partitions.

To give a concrete example, users have reported that it takes 2.5 minutes for 
the assignment to complete with just 2100 consumers reading from 2100 
partitions. Since partitions revoked during the first of two cooperative 
rebalances will remain unassigned until the end of the second rebalance, it's 
important for the rebalance to be as fast as possible. And since one of the 
primary improvements of the cooperative rebalancing protocol is better scaling 
experience, the only OOTB cooperative assignor should not itself scale poorly

If we can constrain the problem a bit, we can simplify the algorithm greatly. 
In many cases the individual consumers won't be subscribed to some random 
subset of the total subscription, they will all be subscribed to the same set 
of topics and rely on the assignor to balance the partition workload.

We can detect this case by checking the group's individual subscriptions and 
call on a more efficient assignment algorithm. 

  was:
In KIP-429 we added the new CooperativeStickyAssignor which leverages on the 
underlying sticky assignment algorithm of the existing StickyAssignor (moved to 
AbstractStickyAssignor). 

 

The algorithm is fairly complex as it tries to optimize stickiness while 
satisfying perfect balance _in the case individual consumers may be subscribed 
to a random subset of the topics._ While it does a pretty good job at what it 
promises to do, it doesn't scale well with large numbers of consumers and 
partitions. 

 

If we can make the assumption that all consumers are subscribed to the same set 
of topics, we can simplify the algorithm greatly and do a sticky-but-balanced 
assignment in a single pass. It would be nice to have an additional cooperative 
assignor OOTB that performs efficiently for users who know their group will 
satisfy this constraint.


> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-05-13 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9987:
---
Summary: Improve sticky partition assignor algorithm  (was: Add new 
cooperative assignor optimized for constant-subscription group)

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In KIP-429 we added the new CooperativeStickyAssignor which leverages on the 
> underlying sticky assignment algorithm of the existing StickyAssignor (moved 
> to AbstractStickyAssignor). 
>  
> The algorithm is fairly complex as it tries to optimize stickiness while 
> satisfying perfect balance _in the case individual consumers may be 
> subscribed to a random subset of the topics._ While it does a pretty good job 
> at what it promises to do, it doesn't scale well with large numbers of 
> consumers and partitions. 
>  
> If we can make the assumption that all consumers are subscribed to the same 
> set of topics, we can simplify the algorithm greatly and do a 
> sticky-but-balanced assignment in a single pass. It would be nice to have an 
> additional cooperative assignor OOTB that performs efficiently for users who 
> know their group will satisfy this constraint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9850) Move KStream#repartition operator validation during Topology build process

2020-05-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9850.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Move KStream#repartition operator validation during Topology build process 
> ---
>
> Key: KAFKA-9850
> URL: https://issues.apache.org/jira/browse/KAFKA-9850
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Levani Kokhreidze
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: help-wanted, newbie, newbie++
> Fix For: 2.6.0
>
>
> `KStream#repartition` operation performs most of its validation regarding 
> joining, co-partitioning, etc after starting Kafka Streams instance. Some 
> parts of this validation can be detected much earlier, specifically during 
> topology `build()`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax merged pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

2020-05-13 Thread GitBox


mjsax merged pull request #8550:
URL: https://github.com/apache/kafka/pull/8550


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

2020-05-13 Thread GitBox


mjsax commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-628317830


   Thanks for the PR @zhaohaidao! Merged to `trunk`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9465) Enclose consumer call with catching InvalidOffsetException

2020-05-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9465.

Resolution: Not A Problem

This was fixed implicitly via some code refactoring.

> Enclose consumer call with catching InvalidOffsetException
> --
>
> Key: KAFKA-9465
> URL: https://issues.apache.org/jira/browse/KAFKA-9465
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In maybeUpdateStandbyTasks, the try block encloses restoreConsumer.poll and 
> record handling.
> Since InvalidOffsetException is thrown by restoreConsumer.poll, we should 
> enclose this call in the try block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax closed pull request #8001: KAFKA-9465: Enclose consumer call with catching InvalidOffsetException

2020-05-13 Thread GitBox


mjsax closed pull request #8001:
URL: https://github.com/apache/kafka/pull/8001


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8001: KAFKA-9465: Enclose consumer call with catching InvalidOffsetException

2020-05-13 Thread GitBox


mjsax commented on pull request #8001:
URL: https://github.com/apache/kafka/pull/8001#issuecomment-628316756


   Thanks @guozhangwang. I resolve the Jira ticket. Closing this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes

2020-05-13 Thread GitBox


mjsax commented on a change in pull request #8049:
URL: https://github.com/apache/kafka/pull/8049#discussion_r424805332



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
##
@@ -30,12 +32,15 @@ object Serdes {
   implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long()
   implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
   implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = 
JSerdes.Bytes()
+  implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer()
+  implicit def shortSerde: Serde[Short] = 
JSerdes.Short().asInstanceOf[Serde[Short]]
   implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
   implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
   implicit def Double: Serde[Double] = 
JSerdes.Double().asInstanceOf[Serde[Double]]
   implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
   implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
   implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def uuidSerde: Serde[UUID] = JSerdes.UUID()

Review comment:
   I am fine with postponing this PR. \cc @vvcephei 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8637: KAFKA-9976: Reuse repartition node in all cases for KGroupedStream and KGroupedTable aggregates

2020-05-13 Thread GitBox


mjsax commented on pull request #8637:
URL: https://github.com/apache/kafka/pull/8637#issuecomment-628315503


   @bbejeck Wondering if there are any backward incompatibility concerns? Can 
you explain why this change is safe? For #8504 there would be an exception 
before and thus it was broken and no compatibility concern raises. Not 100% 
sure about this PR though.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8558: KAFKA-8611 / KIP-221 documentation

2020-05-13 Thread GitBox


mjsax commented on pull request #8558:
URL: https://github.com/apache/kafka/pull/8558#issuecomment-628314584


   @lkokhreidze Did PR shows a conflict. Can you rebase it?
   
   Btw: did you see the follow up question on the mailing list?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soondenana edited a comment on pull request #8068: MINOR: Update to Gradle 4.10.3

2020-05-13 Thread GitBox


soondenana edited a comment on pull request #8068:
URL: https://github.com/apache/kafka/pull/8068#issuecomment-628308978


   Closing PR as this doesn't fix issue.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soondenana commented on pull request #8068: MINOR: Update to Gradle 4.10.3

2020-05-13 Thread GitBox


soondenana commented on pull request #8068:
URL: https://github.com/apache/kafka/pull/8068#issuecomment-628308978


   Issue is now fixed by release team by adding JVM flags for this. This PR is 
no longer needed. Closing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soondenana closed pull request #8068: MINOR: Update to Gradle 4.10.3

2020-05-13 Thread GitBox


soondenana closed pull request #8068:
URL: https://github.com/apache/kafka/pull/8068


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-13 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106744#comment-17106744
 ] 

Sophie Blee-Goldman commented on KAFKA-9989:


Yeah it would be good to dive into why this is happening – are the partitions 
missing because of a cooperative rebalance and we need to wait for one more 
rebalance to get the assignment? Or did the admin client's listOffsets request 
fail causing us to fall back on the PriorTaskAssignor?

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and skip the record processing validation when the assignment 
> is empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks

2020-05-13 Thread GitBox


guozhangwang commented on a change in pull request #8661:
URL: https://github.com/apache/kafka/pull/8661#discussion_r424796625



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -248,17 +243,6 @@ void restoreAllInternal(final Collection> records) {
 final long segmentId = segments.segmentId(timestamp);
 final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
context, observedStreamTime);
 if (segment != null) {
-// This handles the case that state store is moved to a new 
client and does not
-// have the local RocksDB instance for the segment. In this 
case, toggleDBForBulkLoading
-// will only close the database and open it again with bulk 
loading enabled.
-if (!bulkLoadSegments.contains(segment)) {
-segment.toggleDbForBulkLoading(true);
-// If the store does not exist yet, the 
getOrCreateSegmentIfLive will call openDB that
-// makes the open flag for the newly created store.
-// if the store does exist already, then 
toggleDbForBulkLoading will make sure that
-// the store is already open here.
-bulkLoadSegments = new HashSet<>(segments.allSegments());
-}

Review comment:
   Yup, that makes sense to me. I'm thinking about the world where standbys 
(and also restoring tasks) are executed on different threads. The concern about 
IQ are valid indeed that with a large set of un-compacted L0 files.
   
   In the even larger scope, where we would have checkpoints I'd believe that 
bulk-loading would not be very necessary since we would not have a huge number 
of records to catch up any more :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgroothuijsen opened a new pull request #8664: KAFKA-9716: Clarify meaning of compression rate metrics

2020-05-13 Thread GitBox


rgroothuijsen opened a new pull request #8664:
URL: https://github.com/apache/kafka/pull/8664


   There is some confusion over the compression rate metrics, as the meaning of 
the value isn't clearly stated in the metric description. In this case, it was 
assumed that a higher compression rate value meant better compression. This PR 
clarifies the meaning of the value, to prevent misunderstandings.
   
   Alternative approaches that were considered were to either change the name 
of the metric or its implementation, but this would have a negative impact on 
those who are already making use of this metric.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9716) Values of compression-rate and compression-rate-avg are misleading

2020-05-13 Thread Rens Groothuijsen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rens Groothuijsen reassigned KAFKA-9716:


Assignee: Rens Groothuijsen

> Values of compression-rate and compression-rate-avg are misleading
> --
>
> Key: KAFKA-9716
> URL: https://issues.apache.org/jira/browse/KAFKA-9716
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, compression
>Affects Versions: 2.4.1
>Reporter: Christian Kosmowski
>Assignee: Rens Groothuijsen
>Priority: Minor
>
> The values of the following metrics:
> compression-rate and compression-rate-avg and basically every other 
> compression-rate (i.e.) topic compression rate
> are confusing.
> They are calculated as follows:
> {code:java}
> if (numRecords == 0L) {
> buffer().position(initialPosition);
> builtRecords = MemoryRecords.EMPTY;
> } else {
> if (magic > RecordBatch.MAGIC_VALUE_V1)
> this.actualCompressionRatio = (float) writeDefaultBatchHeader() / 
> this.uncompressedRecordsSizeInBytes;
> else if (compressionType != CompressionType.NONE)
> this.actualCompressionRatio = (float) 
> writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes;
> ByteBuffer buffer = buffer().duplicate();
> buffer.flip();
> buffer.position(initialPosition);
> builtRecords = MemoryRecords.readableRecords(buffer.slice());
> }
> {code}
> basically the compressed size is divided by the uncompressed size which leads 
> to a value < 1 for high compression (good if you want compression) or > 1 for 
> poor compression (bad if you want compression).
> From the name "compression rate" i would expect the exact opposite. Apart 
> from the fact that the word "rate" usually refers to comparisons based on 
> values of different units (miles per hour) the correct word "ratio" would 
> refer to the uncompressed size divided by the compressed size. (In the code 
> this is correct, but not with the metric names)
> So if the compressed data takes half the space of the uncompressed data the 
> correct value for compression ratio (or rate) would be 2 and not 0.5 as kafka 
> reports it. That is really confusing and i would AT LEAST expect that this 
> behaviour would be documented somewhere, but it's not all documentation 
> sources just say "the compression rate".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-05-13 Thread GitBox


ijuma commented on pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-628301206


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9554) Define the SPI for Tiered Storage framework

2020-05-13 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-9554:
-

Assignee: Satish Duggana

> Define the SPI for Tiered Storage framework
> ---
>
> Key: KAFKA-9554
> URL: https://issues.apache.org/jira/browse/KAFKA-9554
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core
>Reporter: Alexandre Dupriez
>Assignee: Satish Duggana
>Priority: Major
>
> The goal of this task is to define the SPI (service provider interfaces) 
> which will be used by vendors to implement plug-ins to communicate with 
> specific storage system.
> Done means:
>  * Package with interfaces and key objects available and published for review.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9990) Supporting transactions in tiered storage

2020-05-13 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-9990:
-

 Summary: Supporting transactions in tiered storage
 Key: KAFKA-9990
 URL: https://issues.apache.org/jira/browse/KAFKA-9990
 Project: Kafka
  Issue Type: Sub-task
Reporter: Satish Duggana
Assignee: Satish Duggana






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106731#comment-17106731
 ] 

Matthias J. Sax commented on KAFKA-9989:


Seems like an issue in rebalancing itself? Shouldn't we always get a balanced 
assignment?

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and skip the record processing validation when the assignment 
> is empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9989:
---
Component/s: system tests
 streams

> StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor 
> gets assigned task
> -
>
> Key: KAFKA-9989
> URL: https://issues.apache.org/jira/browse/KAFKA-9989
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:
> "Never saw output 'processed [0-9]* records' on ubuntu@worker6"
> which if we take a closer look at, the rebalance happens but has no task 
> assignment. We should fix this problem by making the rebalance result as part 
> of the check, and skip the record processing validation when the assignment 
> is empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-13 Thread GitBox


mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r424758763



##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -396,33 +396,52 @@ Connecting 
Processors and State Stores// and the WordCountProcessor node as its upstream 
processor
 .addSink("Sink", "sink-topic", "Process");
 
+
+Here is a quick explanation of this example:
+
+A source processor node named "Source" is added to the 
topology using the addSource method, with one Kafka topic
+"source-topic" fed to it.
+A processor node named "Process" with the 
pre-defined WordCountProcessor logic is then added as the 
downstream
+processor of the "Source" node using the addProcessor 
method.
+A predefined persistent key-value state store is added 
and connected to the "Process" node, using
+countStoreBuilder.
+A sink processor node is then added to complete the 
topology using the addSink method, taking the "Process" node
+as its upstream processor and writing to a separate 
"sink-topic" Kafka topic (note that users 
can also use another overloaded variant of addSink
+to dynamically determine the Kafka topic to write to 
for each received record from the upstream processor).
+
+
+In some cases, it may be more convenient to add and 
connect a state store at the same time as you add the processor it is connected 
to to the topology.
+This can be done by implementing ConnectedStoreProvider#stores() on the 
ProcessorSupplier
+in place of calling Topology#addStateStore(), like this:
+
+Topology builder = new 
Topology();
+
+// add the source processor node that takes Kafka topic 
"source-topic" as input
+builder.addSource("Source", "source-topic")
+
+// add the WordCountProcessor node which takes the source 
processor as its upstream processor, along with its state store
+builder.addProcessor("Process", new ProcessorSupplier() { 
public Processor get() { return new WordCountProcessor(); 
} public Set stores() { return countStoreBuilder; } 
});
+
+// add the sink processor node that takes Kafka topic 
"sink-topic" as output
+// and the WordCountProcessor node as its upstream 
processor
+.addSink("Sink", "sink-topic", "Process");
+
+This allows for a processor to "own" state stores, 
effectively encapsulating its usage from the user wiring the topology.
+Multiple processors that share a state store may 
provide the same store with this technique, as long as the StoreBuilder is the 
same instance.
+In these topologies, the "Process" stream processor 
node is considered a downstream processor of the "Source" node, and an
+upstream processor of the "Sink" node.  As a result, 
whenever the "Source" node forwards a newly fetched 
record from
+Kafka to its downstream "Process" node, the WordCountProcessor#process() method is triggered to 
process the record and
+update the associated state store. Whenever context#forward() is 
called in the
+WordCountProcessor#punctuate() method, the aggregate 
key-value pair will be sent via the "Sink" processor node to
+the Kafka topic "sink-topic".  Note that in the WordCountProcessor 
implementation, you must refer to the
+same store name "Counts" when accessing the key-value 
store, otherwise an exception will be thrown at runtime,
+indicating that the state store cannot be found. If 
the state store is not associated with the processor
+in the Topology code, accessing it in the processor’s 
init() method 
will also throw an exception at
+runtime, indicating the state store is not accessible 
from this processor.
+Now that you have fully defined your processor topology 
in your application, you can proceed to
+running the Kafka Streams application.
+

Review comment:
   Meta comment: can you also extend `streams/upgrade-guide.html` -- there 
is a section "public API" changes for the 2.6 release.

##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3164,10 +3164,17 @@ 
 
 Tip
 Even though we do not demonstrate it in 
this example, a stream processor can access any available state stores by
-calling ProcessorContext#getStateStore().  Only such state 
stores are available that (1)

[GitHub] [kafka] lbradstreet commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-05-13 Thread GitBox


lbradstreet commented on pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-628293868


   @ijuma the updated PR should compile OK. I removed the change to make 
SystemTime package private as it seems to be used in other tooling now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks

2020-05-13 Thread GitBox


ableegoldman commented on a change in pull request #8661:
URL: https://github.com/apache/kafka/pull/8661#discussion_r424771783



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -248,17 +243,6 @@ void restoreAllInternal(final Collection> records) {
 final long segmentId = segments.segmentId(timestamp);
 final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
context, observedStreamTime);
 if (segment != null) {
-// This handles the case that state store is moved to a new 
client and does not
-// have the local RocksDB instance for the segment. In this 
case, toggleDBForBulkLoading
-// will only close the database and open it again with bulk 
loading enabled.
-if (!bulkLoadSegments.contains(segment)) {
-segment.toggleDbForBulkLoading(true);
-// If the store does not exist yet, the 
getOrCreateSegmentIfLive will call openDB that
-// makes the open flag for the newly created store.
-// if the store does exist already, then 
toggleDbForBulkLoading will make sure that
-// the store is already open here.
-bulkLoadSegments = new HashSet<>(segments.allSegments());
-}

Review comment:
   Here are the current bulk loading configs: ```
   dbOptions.setMaxBackgroundFlushes(4);
   columnFamilyOptions.setDisableAutoCompactions(true);
   columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
   columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
   columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);
   ```
   Setting aside the problems these are causing users [even for active 
tasks](https://issues.apache.org/jira/browse/KAFKA-9062), they basically mean 
"shove everything into the lowest file level and never attempt to limit these 
writes". This is useful if you're just trying to shove a lot of data into a 
store as fast as possible but not necessary need to use it immediately after, 
which is (debatably) the right thing for restoring tasks but definitely not 
appropriate for standbys*. We will attempt to restore a batch of records once 
per main thread loop, which means doing a lot of other stuff in between. 
There's no reason not to just use normal mode writing for standbys AFAICT -- 
also bulk loading will make IQ on standbys pretty annoying at best.

   *In the larger scope, perhaps when we move standbys to a separate thread, 
I'd say we actually should be turning on bulk loading. BUT we need to issue a 
manual compaction every so often, and ideally not flush them during every 
commit (related to 
[KAFKA-9450](https://issues.apache.org/jira/browse/KAFKA-9450) 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -248,17 +243,6 @@ void restoreAllInternal(final Collection> records) {
 final long segmentId = segments.segmentId(timestamp);
 final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
context, observedStreamTime);
 if (segment != null) {
-// This handles the case that state store is moved to a new 
client and does not
-// have the local RocksDB instance for the segment. In this 
case, toggleDBForBulkLoading
-// will only close the database and open it again with bulk 
loading enabled.
-if (!bulkLoadSegments.contains(segment)) {
-segment.toggleDbForBulkLoading(true);
-// If the store does not exist yet, the 
getOrCreateSegmentIfLive will call openDB that
-// makes the open flag for the newly created store.
-// if the store does exist already, then 
toggleDbForBulkLoading will make sure that
-// the store is already open here.
-bulkLoadSegments = new HashSet<>(segments.allSegments());
-}

Review comment:
   Here are the current bulk loading configs: 
   ```
   dbOptions.setMaxBackgroundFlushes(4);
   columnFamilyOptions.setDisableAutoCompactions(true);
   columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
   columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
   columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);
   ```
   Setting aside the problems these are causing users [even for active 
tasks](https://issues.apache.org/jira/browse/KAFKA-9062), they basically mean 
"shove everything into the lowest file level and never attempt to limit these 
writes". This is useful if you're just trying to shove a lot of data into a 
store as fast as possible but not necessary need to use it immediately after, 
which is (debatably) the right thing for restoring tasks but definitely not 
appropriate for stan

[jira] [Created] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-13 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9989:
--

 Summary: StreamsUpgradeTest.test_metadata_upgrade could not 
guarantee all processor gets assigned task
 Key: KAFKA-9989
 URL: https://issues.apache.org/jira/browse/KAFKA-9989
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
Assignee: Boyang Chen


System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and skip the record processing validation when the assignment is 
empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #8556: MINOR: Add a duplicate() method to Message classes

2020-05-13 Thread GitBox


cmccabe merged pull request #8556:
URL: https://github.com/apache/kafka/pull/8556


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks

2020-05-13 Thread GitBox


guozhangwang commented on a change in pull request #8661:
URL: https://github.com/apache/kafka/pull/8661#discussion_r424761219



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -248,17 +243,6 @@ void restoreAllInternal(final Collection> records) {
 final long segmentId = segments.segmentId(timestamp);
 final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
context, observedStreamTime);
 if (segment != null) {
-// This handles the case that state store is moved to a new 
client and does not
-// have the local RocksDB instance for the segment. In this 
case, toggleDBForBulkLoading
-// will only close the database and open it again with bulk 
loading enabled.
-if (!bulkLoadSegments.contains(segment)) {
-segment.toggleDbForBulkLoading(true);
-// If the store does not exist yet, the 
getOrCreateSegmentIfLive will call openDB that
-// makes the open flag for the newly created store.
-// if the store does exist already, then 
toggleDbForBulkLoading will make sure that
-// the store is already open here.
-bulkLoadSegments = new HashSet<>(segments.allSegments());
-}

Review comment:
   Actually even for standby tasks, it should also be beneficial to use 
bulk-loading right (e.g. if the standby is far behind the active and has a 
large amount of records)?
   
   I'm thinking that in the long run, maybe we could optionally allow restore 
callbacks to be triggered for standby as well: we can use some simple 
heuristics such that if the changelog log-end offset - standby task's store 
offset > certain threshold, we trigger onRestoreStart(), and then we can goes 
back from the "sprinting" mode to normal mode after we've been close enough to 
the log-end offset.
   
   At the mean time, we can maybe hack a bit so that when 
`segment.toggleDbForBulkLoading` we set a flag and in the other we reset the 
flag, then during restoreAll we check the flag to decide whether enable bulk 
loading for newly created segment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-13 Thread GitBox


mjsax commented on pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#issuecomment-628272802


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-13 Thread GitBox


mjsax commented on pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#issuecomment-628272606


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424758127



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##
@@ -236,16 +233,17 @@ public void 
staticAssignmentShouldConvergeWithTheFirstAssignment() {
 0,
 1000L);
 
-final Harness harness = Harness.initializeCluster(1, 1, 1);
+final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
 
 testForConvergence(harness, configs, 1);
 verifyValidAssignment(0, harness);
+verifyBalancedAssignment(harness);
 }
 
 @Test
 public void assignmentShouldConvergeAfterAddingNode() {
-final int numStatelessTasks = 15;
-final int numStatefulTasks = 13;
+final int numStatelessTasks = 7;

Review comment:
   Well, if you have a set of N prime numbers and one number which isn't, 
aren't they all still coprime? :P





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9659) Kafka Streams / Consumer configured for static membership fails on "fatal exception: group.instance.id gets fenced"

2020-05-13 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106699#comment-17106699
 ] 

Guozhang Wang commented on KAFKA-9659:
--

Hi [~lkokhreidze] Could you try out the latest trunk for streams client (you do 
not need to upgrade broker versions) and see if it resolves the issue? I've 
pushed a couple of commits trying to fix this issue but unfortunately they are 
not in any released version yet.

> Kafka Streams / Consumer configured for static membership fails on "fatal 
> exception: group.instance.id gets fenced"
> ---
>
> Key: KAFKA-9659
> URL: https://issues.apache.org/jira/browse/KAFKA-9659
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Rohan Desai
>Assignee: Guozhang Wang
>Priority: Major
> Attachments: ksql-1.logs
>
>
> I'm running a KSQL query, which underneath is built into a Kafka Streams 
> application. The application has been running without issue for a few days, 
> until today, when all the streams threads exited with: 
>  
>  
> {{[ERROR] 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - 
> [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] 
> Received fatal exception: group.instance.id gets fenced}}
> {{[ERROR] 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - 
> [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] 
> Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}}
> {{[ERROR] 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.streams.processor.internals.StreamThread run - 
> stream-thread 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors:}}
>  \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker 
> rejected this static consumer since another consumer with the same 
> group.instance.id has registered with a different member.id.}}{{[INFO] 
> 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.streams.processor.internals.StreamThread setState - 
> stream-thread 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  State transition from RUNNING to PENDING_SHUTDOWN}}
>  
> I've attached the KSQL and Kafka Streams logs to this ticket. Here's a 
> summary for one of the streams threads (instance id `ksql-1-2`):
>  
> Around 00:56:36 the coordinator fails over from b11 to b2:
>  
> {{[INFO] 2020-03-05 00:56:36,258 
> [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - 
> [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to 
> heartbeat failed since coordinator 
> b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: 
> null) is either not started or not valid.}}
>  {{ [INFO] 2020-03-05 00:56:36,258 
> [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group 
> coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud

[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-13 Thread GitBox


mjsax commented on pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#issuecomment-628265887


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9966) Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta

2020-05-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9966.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-9966
> URL: https://issues.apache.org/jira/browse/KAFKA-9966
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Sophie Blee-Goldman
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/]
> {quote}java.lang.AssertionError: Condition not met within timeout 6. 
> Clients did not startup and stabilize on time. Observed transitions: client-1 
> transitions: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] 
> client-2 transitions: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, 
> RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.waitForStateTransition(EosBetaUpgradeIntegrationTest.java:924)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:741){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9966) Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta

2020-05-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106689#comment-17106689
 ] 

Matthias J. Sax commented on KAFKA-9966:


Also related: [https://github.com/apache/kafka/pull/8662]

> Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-9966
> URL: https://issues.apache.org/jira/browse/KAFKA-9966
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Sophie Blee-Goldman
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/]
> {quote}java.lang.AssertionError: Condition not met within timeout 6. 
> Clients did not startup and stabilize on time. Observed transitions: client-1 
> transitions: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] 
> client-2 transitions: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, 
> RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.waitForStateTransition(EosBetaUpgradeIntegrationTest.java:924)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:741){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9966) Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta

2020-05-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9966:
--

Assignee: Sophie Blee-Goldman  (was: Matthias J. Sax)

> Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-9966
> URL: https://issues.apache.org/jira/browse/KAFKA-9966
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Sophie Blee-Goldman
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/]
> {quote}java.lang.AssertionError: Condition not met within timeout 6. 
> Clients did not startup and stabilize on time. Observed transitions: client-1 
> transitions: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] 
> client-2 transitions: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, 
> RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.waitForStateTransition(EosBetaUpgradeIntegrationTest.java:924)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:741){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-13 Thread GitBox


ableegoldman commented on a change in pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#discussion_r424751772



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1862,6 +1866,35 @@ public void 
shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOf
 assertThrows(IllegalStateException.class, () -> 
partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
 }
 
+@Test
+public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics() {
+adminClient = EasyMock.createMock(AdminClient.class);
+final ListOffsetsResult result = 
EasyMock.createNiceMock(ListOffsetsResult.class);
+final KafkaFutureImpl> 
allFuture = new KafkaFutureImpl<>();
+allFuture.complete(emptyMap());
+
+expect(adminClient.listOffsets(emptyMap())).andStubReturn(result);
+expect(result.all()).andReturn(allFuture);
+
+builder.addSource(null, "source1", null, null, null, "topic1");
+builder.addProcessor("processor1", new MockProcessorSupplier(), 
"source1");
+builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), 
"processor1");
+
+subscriptions.put("consumer10",
+  new Subscription(
+  singletonList("topic1"),
+  defaultSubscriptionInfo.encode()
+  ));
+
+EasyMock.replay(result);

Review comment:
   It gets replayed during configuration (at the end of `configureDefault` 
below)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #8648: KAFKA-9966: add internal assignment listener to stabilize eos-beta upgrade test

2020-05-13 Thread GitBox


mjsax merged pull request #8648:
URL: https://github.com/apache/kafka/pull/8648


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-13 Thread GitBox


mjsax commented on pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#issuecomment-628263949


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics

2020-05-13 Thread GitBox


mjsax commented on a change in pull request #8662:
URL: https://github.com/apache/kafka/pull/8662#discussion_r424750958



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1862,6 +1866,35 @@ public void 
shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOf
 assertThrows(IllegalStateException.class, () -> 
partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
 }
 
+@Test
+public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics() {
+adminClient = EasyMock.createMock(AdminClient.class);
+final ListOffsetsResult result = 
EasyMock.createNiceMock(ListOffsetsResult.class);
+final KafkaFutureImpl> 
allFuture = new KafkaFutureImpl<>();
+allFuture.complete(emptyMap());
+
+expect(adminClient.listOffsets(emptyMap())).andStubReturn(result);
+expect(result.all()).andReturn(allFuture);
+
+builder.addSource(null, "source1", null, null, null, "topic1");
+builder.addProcessor("processor1", new MockProcessorSupplier(), 
"source1");
+builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), 
"processor1");
+
+subscriptions.put("consumer10",
+  new Subscription(
+  singletonList("topic1"),
+  defaultSubscriptionInfo.encode()
+  ));
+
+EasyMock.replay(result);

Review comment:
   Don't we need to reply `adminClient`, too?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424729806



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
 private final PriorityQueue clientsByTaskLoad;
-private final BiFunction validClientCriteria;
+private final BiFunction constraint;
 private final Set uniqueClients = new HashSet<>();
 
-ValidClientsByTaskLoadQueue(final Map clientStates,
-final BiFunction 
validClientCriteria) {
-this.validClientCriteria = validClientCriteria;
-
-clientsByTaskLoad = new PriorityQueue<>(
-(client, other) -> {
-final double clientTaskLoad = 
clientStates.get(client).taskLoad();
-final double otherTaskLoad = 
clientStates.get(other).taskLoad();
-if (clientTaskLoad < otherTaskLoad) {
-return -1;
-} else if (clientTaskLoad > otherTaskLoad) {
-return 1;
-} else {
-return client.compareTo(other);
-}
-});
+ConstrainedPrioritySet(final BiFunction constraint,
+   final Function weight) {
+this.constraint = constraint;
+clientsByTaskLoad = new 
PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> 
clientId));
 }
 
 /**
  * @return the next least loaded client that satisfies the given criteria, 
or null if none do
  */
-UUID poll(final TaskId task) {
-final List validClient = poll(task, 1);
-return validClient.isEmpty() ? null : validClient.get(0);
-}
-
-/**
- * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid candidates for the given task
- */
-List poll(final TaskId task, final int numClients) {
-final List nextLeastLoadedValidClients = new LinkedList<>();
+UUID poll(final TaskId task, final Function 
extraConstraint) {

Review comment:
   I'm not sure I see how the returned clients could ever be different 
using "poll N clients" vs "poll N times". Only the clients which are getting a 
new task assigned will have their weight changed while in the middle of an N 
poll, and once we assign this task to that client it no longer meets the 
criteria so we don't care about it anyway right?
   
   The reason for the "poll N clients" method was to save on some of the 
poll-and-reoffer of clients that don't meet the criteria, but I don't think 
that's really worth worrying over. I'm fine with whatever code is easiest to 
read -- just want to understand why this affects the balance?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
##
@@ -35,262 +44,161 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasProperty;
 import static 
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
+import static org.hamcrest.Matchers.is;
 
 public class TaskMovementTest {
-private final ClientState client1 = new ClientState(1);
-private final ClientState client2 = new ClientState(1);
-private final ClientState client3 = new ClientState(1);
-
-private final Map clientStates = 
getClientStatesMap(client1, c

[GitHub] [kafka] junrao commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-05-13 Thread GitBox


junrao commented on pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#issuecomment-628258485


   @andrewchoi5 : Thanks for finding this issue. We fixed 
https://issues.apache.org/jira/browse/KAFKA-9932 recently. So, the 
`fetchLogConfig` from ZK method is called rarely during makeFollowers() now. If 
we do hit the ZK exception, there are a few options: (1) As Jason mentioned, we 
could keep retrying from ZK until successful. This probably needs to be done in 
a separate thread to avoid blocking the request handler thread. So, it can be a 
bit involved. (2) Don't update the leader epoch (we can choose to update the 
leader epoch after the local log is obtained successfully) and log an error. 
Since this issue should be rare now, maybe we can do (2) for now?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-05-13 Thread GitBox


lbradstreet commented on pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-628257888


   @ijuma thanks, I probably should have merged in before asking.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-05-13 Thread GitBox


ijuma commented on pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-628251072


   @lbradstreet Compile errors:
   
   > 12:38:04 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java:29:
 error: SystemTime is not public in org.apache.kafka.common.utils; cannot be 
accessed from outside package
   > 12:38:04 import org.apache.kafka.common.utils.SystemTime;
   > 12:38:04 ^
   > 12:38:04 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java:33:
 error: SystemTime is not public in org.apache.kafka.common.utils; cannot be 
accessed from outside package
   > 12:38:04 import org.apache.kafka.common.utils.SystemTime;
   > 12:38:04 ^
   > 12:38:04 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java:63:
 error: SystemTime is not public in org.apache.kafka.common.utils; cannot be 
accessed from outside package
   > 12:38:04 private static final SystemTime SYSTEM_TIME = new 
SystemTime();
   > 12:38:04  ^
   > 12:38:04 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java:98:
 error: SystemTime is not public in org.apache.kafka.common.utils; cannot be 
accessed from outside package
   > 12:38:04 time = new SystemTime();
   > 12:38:04^
   > 12:38:04 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java:63:
 error: SystemTime is not public in org.apache.kafka.common.utils; cannot be 
accessed from outside package
   > 12:38:04 private static final SystemTime SYSTEM_TIME = new 
SystemTime();



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9988) Connect incorrectly reports task has failed when task takes too long to shutdown

2020-05-13 Thread Sanjana Kaundinya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sanjana Kaundinya updated KAFKA-9988:
-
Affects Version/s: 2.5.1
   2.4.2
   2.3.2
   2.2.3
   2.3.0
   2.4.0
   2.3.1
   2.5.0
   2.4.1

> Connect incorrectly reports task has failed when task takes too long to 
> shutdown
> 
>
> Key: KAFKA-9988
> URL: https://issues.apache.org/jira/browse/KAFKA-9988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.4.2, 
> 2.5.1
>Reporter: Sanjana Kaundinya
>Priority: Major
>
> If the OffsetStorageReader is closed while the task is trying to shutdown, 
> and the task is trying to access the offsets from the OffsetStorageReader, 
> then we see the following in the logs.
> {code:java}
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
> at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
> closed while attempting to read offsets. This is likely because the task was 
> been scheduled to stop but has taken longer than the graceful shutdown period 
> to do so.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
> ... 14 more
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is 
> being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> This is a bit misleading, because the task is already on its way of being 
> shutdown, and doesn't actually need manual intervention to be restarted. We 
> can see that as later on in the logs we see that it throws another 
> unrecoverable exception.
> {code:java}
> [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> If we know a task is on its way of shutting down, we should not throw a 
> ConnectException and instead log a warning so that we don't log false 
> negatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#issuecomment-628236334


   It looks like the test failures were unrelated (because they were all 
different), some known flaky:
   
   java 
8:org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
   Java 11: 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled
   
   Java 14:
   kafka.api.TransactionsBounceTest.testWithGroupId
   kafka.api.TransactionsTest.testBumpTransactionalEpoch



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9988) Log incorrectly reports task has failed when task takes too long to shutdown

2020-05-13 Thread Sanjana Kaundinya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sanjana Kaundinya updated KAFKA-9988:
-
Description: 
If the OffsetStorageReader is closed while the task is trying to shutdown, and 
the task is trying to access the offsets from the OffsetStorageReader, then we 
see the following in the logs.

{code:java}
[2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw an 
uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
closed while attempting to read offsets. This is likely because the task was 
been scheduled to stop but has taken longer than the graceful shutdown period 
to do so.
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
... 14 more
[2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is being 
killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}

This is a bit misleading, because the task is already on its way of being 
shutdown, and doesn't actually need manual intervention to be restarted. We can 
see that as later on in the logs we see that it throws another unrecoverable 
exception.

{code:java}
[2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw an 
uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}

If we know a task is on its way of shutting down, we should not throw a 
ConnectException and instead log a warning so that we don't log false negatives.


  was:
If the OffsetStorageReader is closed while the task is trying to shutdown, and 
the task is trying to access the offsets from the OffsetStorageReader, then we 
see the following in the logs.

{code:java}
[2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=replicator-18} Task threw 
an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
closed while attempting to read offsets. This is likely because the task was 
been scheduled to stop but has taken longer than the graceful shutdown period 
to do so.
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
... 14 more
[2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=replicator-18} Task is 
being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}

This is a bit misleading, because the task is already on its way of being 
shutdown, and doesn't actually need manual intervention to be restarted. We can 
see that as later on in the logs we see that it throws another unrecoverable 
exception.

{code:java}
[2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=replicator-18} Task threw 
an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}

If w

[jira] [Updated] (KAFKA-9988) Connect incorrectly reports task has failed when task takes too long to shutdown

2020-05-13 Thread Sanjana Kaundinya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sanjana Kaundinya updated KAFKA-9988:
-
Summary: Connect incorrectly reports task has failed when task takes too 
long to shutdown  (was: Log incorrectly reports task has failed when task takes 
too long to shutdown)

> Connect incorrectly reports task has failed when task takes too long to 
> shutdown
> 
>
> Key: KAFKA-9988
> URL: https://issues.apache.org/jira/browse/KAFKA-9988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sanjana Kaundinya
>Priority: Major
>
> If the OffsetStorageReader is closed while the task is trying to shutdown, 
> and the task is trying to access the offsets from the OffsetStorageReader, 
> then we see the following in the logs.
> {code:java}
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
> at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
> closed while attempting to read offsets. This is likely because the task was 
> been scheduled to stop but has taken longer than the graceful shutdown period 
> to do so.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
> ... 14 more
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is 
> being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> This is a bit misleading, because the task is already on its way of being 
> shutdown, and doesn't actually need manual intervention to be restarted. We 
> can see that as later on in the logs we see that it throws another 
> unrecoverable exception.
> {code:java}
> [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> If we know a task is on its way of shutting down, we should not throw a 
> ConnectException and instead log a warning so that we don't log false 
> negatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9988) Log incorrectly reports task has failed when task takes too long to shutdown

2020-05-13 Thread Sanjana Kaundinya (Jira)
Sanjana Kaundinya created KAFKA-9988:


 Summary: Log incorrectly reports task has failed when task takes 
too long to shutdown
 Key: KAFKA-9988
 URL: https://issues.apache.org/jira/browse/KAFKA-9988
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sanjana Kaundinya


If the OffsetStorageReader is closed while the task is trying to shutdown, and 
the task is trying to access the offsets from the OffsetStorageReader, then we 
see the following in the logs.

{code:java}
[2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=replicator-18} Task threw 
an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
closed while attempting to read offsets. This is likely because the task was 
been scheduled to stop but has taken longer than the graceful shutdown period 
to do so.
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
... 14 more
[2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=replicator-18} Task is 
being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}

This is a bit misleading, because the task is already on its way of being 
shutdown, and doesn't actually need manual intervention to be restarted. We can 
see that as later on in the logs we see that it throws another unrecoverable 
exception.

{code:java}
[2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=replicator-18} Task threw 
an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}

If we know a task is on its way of shutting down, we should not throw a 
ConnectException and instead log a warning so that we don't log false negatives.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424718290



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##
@@ -75,4 +94,303 @@
 static UUID uuidForInt(final int n) {
 return new UUID(0, n);
 }
+
+static void assertValidAssignment(final int numStandbyReplicas,
+  final Set statefulTasks,
+  final Set statelessTasks,
+  final Map 
assignedStates,
+  final StringBuilder failureContext) {
+assertValidAssignment(
+numStandbyReplicas,
+0,
+statefulTasks,
+statelessTasks,
+assignedStates,
+failureContext
+);
+}
+
+static void assertValidAssignment(final int numStandbyReplicas,
+  final int maxWarmupReplicas,
+  final Set statefulTasks,
+  final Set statelessTasks,
+  final Map 
assignedStates,
+  final StringBuilder failureContext) {
+final Map> assignments = new TreeMap<>();
+for (final TaskId taskId : statefulTasks) {
+assignments.put(taskId, new TreeSet<>());
+}
+for (final TaskId taskId : statelessTasks) {
+assignments.put(taskId, new TreeSet<>());
+}
+for (final Map.Entry entry : 
assignedStates.entrySet()) {
+validateAndAddActiveAssignments(statefulTasks, statelessTasks, 
failureContext, assignments, entry);
+validateAndAddStandbyAssignments(statefulTasks, statelessTasks, 
failureContext, assignments, entry);
+}
+
+final AtomicInteger remainingWarmups = new 
AtomicInteger(maxWarmupReplicas);
+
+final TreeMap> misassigned =
+assignments
+.entrySet()
+.stream()
+.filter(entry -> {
+final int expectedActives = 1;
+final boolean isStateless = 
statelessTasks.contains(entry.getKey());
+final int expectedStandbys = isStateless ? 0 : 
numStandbyReplicas;
+// We'll never assign even the expected number of standbys 
if they don't actually fit in the cluster
+final int expectedAssignments = Math.min(
+assignedStates.size(),
+expectedActives + expectedStandbys
+);
+final int actualAssignments = entry.getValue().size();
+if (actualAssignments == expectedAssignments) {
+return false; // not misassigned
+} else {
+if (actualAssignments == expectedAssignments + 1 && 
remainingWarmups.get() > 0) {
+remainingWarmups.getAndDecrement();
+return false; // it's a warmup, so it's fine
+} else {
+return true; // misassigned
+}
+}
+})
+.collect(entriesToMap(TreeMap::new));
+
+if (!misassigned.isEmpty()) {

Review comment:
   L131-158 is just gathering the information about whether each task is 
correctly assigned or not, based on its type and the standby configs (and maybe 
the warmup config). It doesn't make any assertions. So this check is actually 
the assertion, that no tasks are incorrectly assigned.
   
   Doing it this way is nicer, since when it fails, it tells you _all_ the 
incorrectly assigned tasks, not just the first one.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas

2020-05-13 Thread GitBox


apovzner commented on pull request #8650:
URL: https://github.com/apache/kafka/pull/8650#issuecomment-628234920


   @dajac Thanks for your very thorough review. I addressed all your comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #8650: MINOR: Added unit tests for ConnectionQuotas

2020-05-13 Thread GitBox


apovzner commented on a change in pull request #8650:
URL: https://github.com/apache/kafka/pull/8650#discussion_r424707495



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.net.InetAddress
+import java.util.concurrent.{Executors, TimeUnit}
+import java.util.Properties
+
+import com.yammer.metrics.core.Meter
+import kafka.metrics.KafkaMetricsGroup
+import kafka.network.Processor.ListenerMetricTag
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Assert._
+import org.junit._
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.collection.{Map, mutable}
+import scala.concurrent.TimeoutException
+
+class ConnectionQuotasTest {
+  private val time = new MockTime
+  private val listeners = Map(
+"EXTERNAL" -> ListenerDesc(new ListenerName("EXTERNAL"), 
InetAddress.getByName("192.168.1.1")),
+"ADMIN" -> ListenerDesc(new ListenerName("ADMIN"), 
InetAddress.getByName("192.168.1.2")),
+"REPLICATION" -> ListenerDesc(new ListenerName("REPLICATION"), 
InetAddress.getByName("192.168.1.3")))
+  private val blockedPercentMeters = mutable.Map[String, Meter]()
+  private val knownHost = InetAddress.getByName("192.168.10.0")
+  private val unknownHost = InetAddress.getByName("192.168.2.0")
+
+  case class ListenerDesc(listenerName: ListenerName, defaultClientIp: 
InetAddress) {
+override def toString: String = {
+  s"(listener=${listenerName.value}, 
client=${defaultClientIp.getHostAddress})"
+}
+  }
+
+  def brokerPropsWithDefaultConnectionLimits: Properties = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 0)
+props.put(KafkaConfig.ListenersProp, 
"EXTERNAL://localhost:0,REPLICATION://localhost:1,ADMIN://localhost:2")
+// ConnectionQuotas does not limit inter-broker listener even when 
broker-wide connection limit is reached
+props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
+props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"EXTERNAL:PLAINTEXT,REPLICATION:PLAINTEXT,ADMIN:PLAINTEXT")
+props
+  }
+
+  @Before
+  def setUp(): Unit = {
+// Clean-up any metrics left around by previous tests
+TestUtils.clearYammerMetrics()
+
+listeners.keys.foreach { name =>
+blockedPercentMeters.put(name, KafkaMetricsGroup.newMeter(
+  s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, 
Map(ListenerMetricTag -> name)))
+}
+  }
+
+  @After
+  def tearDown(): Unit = {
+TestUtils.clearYammerMetrics()
+blockedPercentMeters.clear()
+  }
+
+  @Test
+  def testFailWhenNoListeners(): Unit = {
+val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+val connectionQuotas = new ConnectionQuotas(config, time)
+
+val executor = Executors.newSingleThreadExecutor
+try {
+  // inc() on a separate thread in case it blocks
+  val externalListener = listeners("EXTERNAL")
+  executor.submit((() =>
+
intercept[RuntimeException](connectionQuotas.inc(externalListener.listenerName, 
externalListener.defaultClientIp, blockedPercentMeters("EXTERNAL": Runnable
+  ).get(5, TimeUnit.SECONDS)
+} finally {
+  executor.shutdownNow()
+}
+  }
+
+  @Test
+  def testNoConnectionLimitsByDefault(): Unit = {
+val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+val connectionQuotas = new ConnectionQuotas(config, time)
+addListenersAndVerify(config, connectionQuotas)
+
+val executor = Executors.newFixedThreadPool(listeners.size)
+try {
+  // verify there is no limit by accepting 1 connections as fast as 
possible
+  val numConnections = 1
+  val futures = listeners.values.map( listener =>
+executor.submit((() => acceptConnections(connectionQuotas, listener, 
numConnections)): Runnable) )
+  futures.foreach(_.get(10, TimeUnit.SECONDS))
+  listeners.values.foreach { listener =>
+assertEquals(s"${l

[GitHub] [kafka] apovzner commented on a change in pull request #8650: MINOR: Added unit tests for ConnectionQuotas

2020-05-13 Thread GitBox


apovzner commented on a change in pull request #8650:
URL: https://github.com/apache/kafka/pull/8650#discussion_r424706138



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.net.InetAddress
+import java.util.concurrent.{Executors, TimeUnit}
+import java.util.Properties
+
+import com.yammer.metrics.core.Meter
+import kafka.metrics.KafkaMetricsGroup
+import kafka.network.Processor.ListenerMetricTag
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Assert._
+import org.junit._
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.collection.{Map, mutable}
+import scala.concurrent.TimeoutException
+
+class ConnectionQuotasTest {
+  private val time = new MockTime
+  private val listeners = Map(
+"EXTERNAL" -> ListenerDesc(new ListenerName("EXTERNAL"), 
InetAddress.getByName("192.168.1.1")),
+"ADMIN" -> ListenerDesc(new ListenerName("ADMIN"), 
InetAddress.getByName("192.168.1.2")),
+"REPLICATION" -> ListenerDesc(new ListenerName("REPLICATION"), 
InetAddress.getByName("192.168.1.3")))
+  private val blockedPercentMeters = mutable.Map[String, Meter]()
+  private val knownHost = InetAddress.getByName("192.168.10.0")
+  private val unknownHost = InetAddress.getByName("192.168.2.0")
+
+  case class ListenerDesc(listenerName: ListenerName, defaultClientIp: 
InetAddress) {
+override def toString: String = {
+  s"(listener=${listenerName.value}, 
client=${defaultClientIp.getHostAddress})"
+}
+  }
+
+  def brokerPropsWithDefaultConnectionLimits: Properties = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 0)
+props.put(KafkaConfig.ListenersProp, 
"EXTERNAL://localhost:0,REPLICATION://localhost:1,ADMIN://localhost:2")
+// ConnectionQuotas does not limit inter-broker listener even when 
broker-wide connection limit is reached
+props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
+props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"EXTERNAL:PLAINTEXT,REPLICATION:PLAINTEXT,ADMIN:PLAINTEXT")
+props
+  }
+
+  @Before
+  def setUp(): Unit = {
+// Clean-up any metrics left around by previous tests
+TestUtils.clearYammerMetrics()
+
+listeners.keys.foreach { name =>
+blockedPercentMeters.put(name, KafkaMetricsGroup.newMeter(
+  s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, 
Map(ListenerMetricTag -> name)))
+}
+  }
+
+  @After
+  def tearDown(): Unit = {
+TestUtils.clearYammerMetrics()
+blockedPercentMeters.clear()
+  }
+
+  @Test
+  def testFailWhenNoListeners(): Unit = {
+val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+val connectionQuotas = new ConnectionQuotas(config, time)
+
+val executor = Executors.newSingleThreadExecutor
+try {
+  // inc() on a separate thread in case it blocks
+  val externalListener = listeners("EXTERNAL")
+  executor.submit((() =>
+
intercept[RuntimeException](connectionQuotas.inc(externalListener.listenerName, 
externalListener.defaultClientIp, blockedPercentMeters("EXTERNAL": Runnable
+  ).get(5, TimeUnit.SECONDS)
+} finally {
+  executor.shutdownNow()
+}
+  }
+
+  @Test
+  def testNoConnectionLimitsByDefault(): Unit = {
+val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+val connectionQuotas = new ConnectionQuotas(config, time)
+addListenersAndVerify(config, connectionQuotas)
+
+val executor = Executors.newFixedThreadPool(listeners.size)
+try {
+  // verify there is no limit by accepting 1 connections as fast as 
possible
+  val numConnections = 1
+  val futures = listeners.values.map( listener =>
+executor.submit((() => acceptConnections(connectionQuotas, listener, 
numConnections)): Runnable) )
+  futures.foreach(_.get(10, TimeUnit.SECONDS))
+  listeners.values.foreach { listener =>
+assertEquals(s"${l

[GitHub] [kafka] hachikuji commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-05-13 Thread GitBox


hachikuji commented on pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#issuecomment-628204174


   @andrewchoi5 Just want to make sure I understand the problem. The scenario 
is that we lose the zk session while handling a LeaderAndIsr request. Current 
LeaderAndIsr handling works like this:
   
   1. Check epoch of each partition. If it is less than or equal to current 
epoch, ignore the update.
   2. Update the epoch for each partition.
   3. Make followers and leaders, which involves loading topic configs from zk.
   
   So the problem occurs when we hit an error loading the topic configs in step 
3). This potentially causes us to miss the needed state changes from that 
request and also prevents us from being able to retry them because the epoch 
has already been updated. Is that right?
   
   I guess the fix here is only a partial fix. We would still be left with the 
one failed partition, right?
   
   Off the top of my head, I am wondering whether we should be retrying the 
request at a lower level. For example, maybe `getEntityConfigs` could catch the 
`ZooKeeperClientExpiredException` and retry. I assume there is a good reason we 
do not catch this exception in `retryRequestUntilConnected` already. Perhaps it 
is unsafe to assume that the requests are still valid after a session 
expiration. However, just for reading configurations, I do not see a problem 
retrying after a session expiration.
   
   cc @junrao 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-05-13 Thread GitBox


ijuma commented on pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-628200880


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-05-13 Thread GitBox


lbradstreet commented on pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-628192600


   @ijuma can this be merged?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2020-05-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106558#comment-17106558
 ] 

Matthias J. Sax commented on KAFKA-3184:


I am personally not sure how useful "local checkpointing" is at all? Note that 
for persistent stores, the ides is to allow holding state that is larger than 
main memory. It's not really related to fault-tolerance or similar (it only has 
the nice side effect that rolling restarts are quick – for this case, maybe 
dumping the in-memory store on disk during shutdown might be helpful; but this 
would not be regular checkpointing). Also, to avoid too long state recovery 
times, used can configure standbys.

For scale-out, the same issue arrises for in-memory and persistent stores and 
it's addressed via KIP-441. So we should not conflate orthogonal issue.

Having a remote checkpoint mechanism would be a nice to have feature, but it 
raises a lot of complex issues we need to address. (1) where to actually put 
the store? Kafka Streams is a library and should not have other dependencies by 
default; thus, remote checkpointing must be an opt-in feature only. (2) How can 
we do incremental checkpointing (if it's not incremental, it's too heavy weight 
and we don't need to build it at all), (3) how do we "compact" the check point 
increments in the remote location? This is the hardest issue we need to solve.

There was also the idea to actually re-use standbys for quick recovery: instead 
of checkpointing to remote storage, one just configures standbys. On recovery, 
instead of reading the changelog, we copy RocksDB sst files directly from the 
standby to the active (or to be more precise, the standby would be come the 
active anyway...) This approach avoids the dependency to an external system and 
also solves the "how to compact" issue, as RocksDB does it for us. Of course, 
it's more expensive (in dollars) to keep the state in the app compared to 
pushing it to cheap external storage.

Thus some food for thoughts.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores

2020-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9897.
--
Resolution: Fixed

> Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
> -
>
> Key: KAFKA-9897
> URL: https://issues.apache.org/jira/browse/KAFKA-9897
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/]
> {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get 
> state store source-table because the stream thread is PARTITIONS_ASSIGNED, 
> not RUNNING at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85)
>  at 
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61)
>  at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9984) Should fail the subscription when pattern is empty

2020-05-13 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106549#comment-17106549
 ] 

Guozhang Wang commented on KAFKA-9984:
--

Sounds good to me :)

> Should fail the subscription when pattern is empty
> --
>
> Key: KAFKA-9984
> URL: https://issues.apache.org/jira/browse/KAFKA-9984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Boyang Chen
>Priority: Major
>
> We have seen a case where the consumer subscribes to an empty string pattern:
> ```
> [Consumer ...  ] Subscribed to pattern:  ''
> ```
> which doesn't make any sense and usually indicate a configuration error. The 
> `consumer.subscribe(pattern)` call should fail with illegal argument for this 
> case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores

2020-05-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106548#comment-17106548
 ] 

Matthias J. Sax commented on KAFKA-9897:


Sure. We can always reopen again if necessary. What what the PR / Jira ticket 
number?

> Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
> -
>
> Key: KAFKA-9897
> URL: https://issues.apache.org/jira/browse/KAFKA-9897
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/]
> {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get 
> state store source-table because the stream thread is PARTITIONS_ASSIGNED, 
> not RUNNING at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85)
>  at 
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61)
>  at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9984) Should fail the subscription when pattern is empty

2020-05-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9984:
---
Component/s: (was: streams)

> Should fail the subscription when pattern is empty
> --
>
> Key: KAFKA-9984
> URL: https://issues.apache.org/jira/browse/KAFKA-9984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Boyang Chen
>Priority: Major
>
> We have seen a case where the consumer subscribes to an empty string pattern:
> ```
> [Consumer ...  ] Subscribed to pattern:  ''
> ```
> which doesn't make any sense and usually indicate a configuration error. The 
> `consumer.subscribe(pattern)` call should fail with illegal argument for this 
> case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9982) [kafka-connect] Source connector does not guarantee at least once delivery

2020-05-13 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105942#comment-17105942
 ] 

Chris Egerton edited comment on KAFKA-9982 at 5/13/20, 6:10 PM:


The producers the framework uses to write data from source tasks to Kafka are 
[configured 
conservatively|https://github.com/apache/kafka/blob/9bc96d54f8953d190a1fb6478a0656f049ee3b32/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L557-L563]
 to prevent multiple concurrent in-flight requests, which might compromise the 
ordering of the records.

As of the fix for https://issues.apache.org/jira/browse/KAFKA-8586, the 
framework will cease processing records from a source task if it fails to send 
a record to Kafka.

The framework does use an entirely different producer to write source offsets 
to Kafka, but no offsets are written to Kafka unless the record they correspond 
to has been ack'd by the broker and safely made it to Kafka.

[~q.xu] based on the source code for the worker, I don't think this analysis is 
correct. Have you observed this behavior yourself?


was (Author: chrisegerton):
The producers the framework uses to write data from source tasks to Kafka are 
[configured 
conservatively|https://github.com/apache/kafka/blob/9bc96d54f8953d190a1fb6478a0656f049ee3b32/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L557-L563]
 to prevent multiple concurrent in-flight requests, which might compromise the 
ordering of the records.

As of the fix for https://issues.apache.org/jira/browse/KAFKA-8586, the 
framework will cease processing records from a source task if it fails to send 
a record to Kafka.

The framework does use an entirely different producer to write source offsets 
to Kafka, but no offsets are written to Kafka unless the record they correspond 
has been ack'd by the broker and safely made it to Kafka.

[~q.xu] based on the source code for the worker, I don't think this analysis is 
correct. Have you observed this behavior yourself?

> [kafka-connect] Source connector does not guarantee at least once delivery
> --
>
> Key: KAFKA-9982
> URL: https://issues.apache.org/jira/browse/KAFKA-9982
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Qinghui Xu
>Priority: Major
>
> In kafka-connect runtime, the WorkerSourceTask is responsible for sending 
> records to the destination topics and managing the source offset commit. 
> Committed offsets are then used later for recovery of tasks during rebalance 
> or restart.
> But there are two concerns when looking into the WorkerSourceTask 
> implementation:
>  * When producer fail to send records, there's no retry but just skipping 
> offset commit and then execute next loop (poll for new records)
>  * The offset commit and effectively sending records over network are in fact 
> asynchronous, which means the offset commit could happen before records are 
> received by brokers, and a rebalance/restart in this gap could lead to 
> message loss.
> The conclusion is thus that the source connector does not support at least 
> once semantics by default (without the plugin implementation making extra 
> effort itself). I consider this as a bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9659) Kafka Streams / Consumer configured for static membership fails on "fatal exception: group.instance.id gets fenced"

2020-05-13 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106528#comment-17106528
 ] 

Levani Kokhreidze commented on KAFKA-9659:
--

Hi [~guozhang] 

Just fyi - we've experienced this error as well. Some of our Kafka streams jobs 
(with static membership) sometimes die whenever we restart one of the brokers. 
It affects broker version 2.3.1 and 2.4 as well.

We can reliably reproduce the issue in our "staging" environment so if some 
extra debug logs can be useful from clients/brokers would be happy to help.

> Kafka Streams / Consumer configured for static membership fails on "fatal 
> exception: group.instance.id gets fenced"
> ---
>
> Key: KAFKA-9659
> URL: https://issues.apache.org/jira/browse/KAFKA-9659
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Rohan Desai
>Assignee: Guozhang Wang
>Priority: Major
> Attachments: ksql-1.logs
>
>
> I'm running a KSQL query, which underneath is built into a Kafka Streams 
> application. The application has been running without issue for a few days, 
> until today, when all the streams threads exited with: 
>  
>  
> {{[ERROR] 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - 
> [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] 
> Received fatal exception: group.instance.id gets fenced}}
> {{[ERROR] 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - 
> [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] 
> Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}}
> {{[ERROR] 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.streams.processor.internals.StreamThread run - 
> stream-thread 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors:}}
>  \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker 
> rejected this static consumer since another consumer with the same 
> group.instance.id has registered with a different member.id.}}{{[INFO] 
> 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.streams.processor.internals.StreamThread setState - 
> stream-thread 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  State transition from RUNNING to PENDING_SHUTDOWN}}
>  
> I've attached the KSQL and Kafka Streams logs to this ticket. Here's a 
> summary for one of the streams threads (instance id `ksql-1-2`):
>  
> Around 00:56:36 the coordinator fails over from b11 to b2:
>  
> {{[INFO] 2020-03-05 00:56:36,258 
> [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - 
> [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to 
> heartbeat failed since coordinator 
> b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: 
> null) is either not started or not valid.}}
>  {{ [INFO] 2020-03-05 00:56:36,258 
> [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer,
>  groupId=_confl

[jira] [Commented] (KAFKA-9927) Add support for varint types to message generator

2020-05-13 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106465#comment-17106465
 ] 

Tom Bentley commented on KAFKA-9927:


[~hachikuji] I've implemented this 
(https://github.com/tombentley/kafka/pull/new/KAFKA-9927-varint-in-messages). 
Does this need change need a KIP since use of this variable length encoding for 
fields would be new to the protocol? If not I'll open a PR, otherwise I'll put 
a KIP together. 

> Add support for varint types to message generator
> -
>
> Key: KAFKA-9927
> URL: https://issues.apache.org/jira/browse/KAFKA-9927
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Tom Bentley
>Priority: Major
>
> It would be nice to be able to use either a "varint32" or "varint64" type or 
> to add a flag to indicate variable length encoding.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9987) Add new cooperative assignor optimized for constant-subscription group

2020-05-13 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9987:
---
Summary: Add new cooperative assignor optimized for constant-subscription 
group  (was: Add new cooperative assignor optimized for constant-subscription. 
group)

> Add new cooperative assignor optimized for constant-subscription group
> --
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In KIP-429 we added the new CooperativeStickyAssignor which leverages on the 
> underlying sticky assignment algorithm of the existing StickyAssignor (moved 
> to AbstractStickyAssignor). 
>  
> The algorithm is fairly complex as it tries to optimize stickiness while 
> satisfying perfect balance _in the case individual consumers may be 
> subscribed to a random subset of the topics._ While it does a pretty good job 
> at what it promises to do, it doesn't scale well with large numbers of 
> consumers and partitions. 
>  
> If we can make the assumption that all consumers are subscribed to the same 
> set of topics, we can simplify the algorithm greatly and do a 
> sticky-but-balanced assignment in a single pass. It would be nice to have an 
> additional cooperative assignor OOTB that performs efficiently for users who 
> know their group will satisfy this constraint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >