[GitHub] [kafka] lkokhreidze commented on pull request #8558: KAFKA-8611 / KIP-221 documentation
lkokhreidze commented on pull request #8558: URL: https://github.com/apache/kafka/pull/8558#issuecomment-628419029 Hi @mjsax I've rebased the branch. Do you mean comment by Guozhang in the voting thread? If not, I missed it and can't find anything new in the DISCUSS thread. Can you point me where was it asked? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
[ https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9676. Resolution: Fixed The current unit test coverage is pretty good now, closing the ticket. > Add test coverage for new ActiveTaskCreator and StandbyTaskCreator > -- > > Key: KAFKA-9676 > URL: https://issues.apache.org/jira/browse/KAFKA-9676 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Boyang Chen >Priority: Major > Labels: help-wanted, newbie > > The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit > test coverage. We should add corresponding tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106950#comment-17106950 ] Boyang Chen commented on KAFKA-9989: I didn't find that in the log [~ableegoldman] > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and skip the record processing validation when the assignment > is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #8395: Added doc for KIP-535 and updated it for KIP-562
abbccdda commented on a change in pull request #8395: URL: https://github.com/apache/kafka/pull/8395#discussion_r404455662 ## File path: docs/upgrade.html ## @@ -39,7 +39,8 @@ Notable changes in 2 https://github.com/apache/kafka/tree/2.5/examples";>examples folder. Check out https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics";>KIP-447 for the full details. -Deprecated KafkaStreams.store(String, QueryableStoreType) and replaced it with KafkaStreams.store(StoreQueryParameters). +Provided support to query stale stores(for high availability) and the stores belonging to a specific partition by deprecating KafkaStreams.store(String, QueryableStoreType) and replacing it with KafkaStreams.store(StoreQueryParameters). Review comment: ![image](https://user-images.githubusercontent.com/5845561/78615480-809bb580-7826-11ea-9f59-2e7c3cf1a901.png) Let's add a space between `stores(` as `stores (`, and add a period after `KafkaStreams.allLocalStorePartitionLags()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao opened a new pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty
zhaohaidao opened a new pull request #8665: URL: https://github.com/apache/kafka/pull/8665 Ticket: KAFKA-9984 It indicates a configuration error when consumer subscribes an empty pattern: ``` [Consumer ... ] Subscribed to pattern: '' ``` The `consumer.subscribe(pattern)` call should fail with illegal argument for this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8623: MINOR: Update the documentations
showuon commented on pull request #8623: URL: https://github.com/apache/kafka/pull/8623#issuecomment-628403219 Hi @kkonstantine , could you please review this small PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
vvcephei commented on a change in pull request #8662: URL: https://github.com/apache/kafka/pull/8662#discussion_r424712060 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java ## @@ -105,6 +109,7 @@ public static String getTaskProducerClientId(final String threadClientId, final endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); } } catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) { +LOG.warn("listOffsets request failed due to ", e); Review comment: ```suggestion LOG.warn("listOffsets request failed.", e); ``` Thanks! (minor suggestion to make the log message more typical) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -169,6 +173,9 @@ public void makeReady(final Map topics) { log.error(timeoutAndRetryError); throw new StreamsException(timeoutAndRetryError); } +log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); Review comment: It's not what you signed up for, but I'm wondering if we should at least submit a Jira to give some of these AdminClient methods a "full consistency" mode. In other words, since the command returns a future anyway, it would be nice to be able to tell it not to return until it can guarantee the topic will appear to be fully created on all brokers. I'm mildly concerned that we're just kicking the can down the road a little ways with this PR. I.e., we let the assignment happen, but then some other metadata (or data) operation for that topic will just fail shortly thereafter. More generally, we jump through a lot of hoops in our own tests to try and make sure that the topics are really, actually created (or deleted) before proceeding with the test, and I'm sure that our users also suffer from the same problem in their testing and production code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ
chia7712 commented on a change in pull request #8663: URL: https://github.com/apache/kafka/pull/8663#discussion_r424860254 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java ## @@ -96,6 +101,25 @@ public static void validate(Map props) { throw new ConfigException("Must configure one of " + SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG); } + +if (hasDlqTopicConfig) { +String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim(); +if (hasTopicsConfig) { +List topics = parseTopicsList(props); +if (topics.contains(dlqTopic)) { +throw new ConfigException(DLQ_TOPIC_NAME_CONFIG + " has a topic name which is already in " + Review comment: Should we log the topic name for this exception? For example, ```has a topic name (xxx) which``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.
[ https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106865#comment-17106865 ] Andre Price commented on KAFKA-9981: Not sure if related but I think I'm seeing issues where task configs don't seem to be getting updated consistently when new topics/partitions are found. > Running a dedicated mm2 cluster with more than one nodes,When the > configuration is updated the task is not aware and will lose the update > operation. > > > Key: KAFKA-9981 > URL: https://issues.apache.org/jira/browse/KAFKA-9981 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: victor >Priority: Major > > DistributedHerder.reconfigureConnector induction config update as follows: > {code:java} > if (changed) { > List> rawTaskProps = reverseTransform(connName, > configState, taskProps); > if (isLeader()) { > configBackingStore.putTaskConfigs(connName, rawTaskProps); > cb.onCompletion(null, null); > } else { > // We cannot forward the request on the same thread because this > reconfiguration can happen as a result of connector > // addition or removal. If we blocked waiting for the response from > leader, we may be kicked out of the worker group. > forwardRequestExecutor.submit(new Runnable() { > @Override > public void run() { > try { > String leaderUrl = leaderUrl(); > if (leaderUrl == null || leaderUrl.trim().isEmpty()) { > cb.onCompletion(new ConnectException("Request to > leader to " + > "reconfigure connector tasks failed " + > "because the URL of the leader's REST > interface is empty!"), null); > return; > } > String reconfigUrl = RestServer.urlJoin(leaderUrl, > "/connectors/" + connName + "/tasks"); > log.trace("Forwarding task configurations for connector > {} to leader", connName); > RestClient.httpRequest(reconfigUrl, "POST", null, > rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); > cb.onCompletion(null, null); > } catch (ConnectException e) { > log.error("Request to leader to reconfigure connector > tasks failed", e); > cb.onCompletion(e, null); > } > } > }); > } > } > {code} > KafkaConfigBackingStore task checks for configuration updates,such as topic > whitelist update.If KafkaConfigBackingStore task is not running on leader > node,an HTTP request will be send to notify the leader of the configuration > update.However,dedicated mm2 cluster does not have the HTTP server turned > on,so the request will fail to be sent,causing the update operation to be > lost. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ
chia7712 commented on a change in pull request #8663: URL: https://github.com/apache/kafka/pull/8663#discussion_r424859533 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java ## @@ -108,6 +132,20 @@ public static boolean hasTopicsRegexConfig(Map props) { return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty(); } +public static boolean hasDlqTopicConfig(Map props) { +String dqlTopicStr = props.get(DLQ_TOPIC_NAME_CONFIG); +return dqlTopicStr != null && !dqlTopicStr.trim().isEmpty(); +} + +public static List parseTopicsList(Map props) { +List topics = (List) ConfigDef.parseType(TOPICS_CONFIG, props.get(TOPICS_CONFIG), Type.LIST); +return topics +.stream() +.filter(topic -> !topic.isEmpty()) +.distinct() Review comment: How about returning a Set instead of List? ``` return topics .stream() .filter(topic -> !topic.isEmpty()) .collect(Collectors.toSet()); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106864#comment-17106864 ] Sophie Blee-Goldman commented on KAFKA-9989: Yeah that looks like the FallbackPriorTaskAssignor to me. Do you see something like {code:java} Failed to fetch end offsets for changelogs, will return previous assignment to clients and " + "trigger another rebalance to retry. {code} in the logs? Depending on when this test was run this specific log message may or may not have existed, we've made a lot of changes here lately.. > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and skip the record processing validation when the assignment > is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ
chia7712 commented on a change in pull request #8663: URL: https://github.com/apache/kafka/pull/8663#discussion_r424859533 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java ## @@ -108,6 +132,20 @@ public static boolean hasTopicsRegexConfig(Map props) { return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty(); } +public static boolean hasDlqTopicConfig(Map props) { +String dqlTopicStr = props.get(DLQ_TOPIC_NAME_CONFIG); +return dqlTopicStr != null && !dqlTopicStr.trim().isEmpty(); +} + +public static List parseTopicsList(Map props) { +List topics = (List) ConfigDef.parseType(TOPICS_CONFIG, props.get(TOPICS_CONFIG), Type.LIST); +return topics +.stream() +.filter(topic -> !topic.isEmpty()) +.distinct() Review comment: why not just return a Set instead of List? ``` return topics .stream() .filter(topic -> !topic.isEmpty()) .collect(Collectors.toSet()); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9984) Should fail the subscription when pattern is empty
[ https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao reassigned KAFKA-9984: -- Assignee: HaiyuanZhao > Should fail the subscription when pattern is empty > -- > > Key: KAFKA-9984 > URL: https://issues.apache.org/jira/browse/KAFKA-9984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Boyang Chen >Assignee: HaiyuanZhao >Priority: Major > > We have seen a case where the consumer subscribes to an empty string pattern: > ``` > [Consumer ... ] Subscribed to pattern: '' > ``` > which doesn't make any sense and usually indicate a configuration error. The > `consumer.subscribe(pattern)` call should fail with illegal argument for this > case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9984) Should fail the subscription when pattern is empty
[ https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106857#comment-17106857 ] Boyang Chen commented on KAFKA-9984: [~guozhang] Thanks! > Should fail the subscription when pattern is empty > -- > > Key: KAFKA-9984 > URL: https://issues.apache.org/jira/browse/KAFKA-9984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Boyang Chen >Priority: Major > > We have seen a case where the consumer subscribes to an empty string pattern: > ``` > [Consumer ... ] Subscribed to pattern: '' > ``` > which doesn't make any sense and usually indicate a configuration error. The > `consumer.subscribe(pattern)` call should fail with illegal argument for this > case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106853#comment-17106853 ] Boyang Chen commented on KAFKA-9989: Looks like it is determined by the assignor: (the latest added consumer gets 0 partition assigned) [2020-05-07 07:53:15,487] DEBUG stream-thread [StreamsUpgradeTest-0626d80d-03ed-4cf9-95a2-d778a791394f-StreamThread-1-consumer] Assigning tasks [0_0, 0_1, 0_2, 0_3, 0_4] to clients \{0626d80d-03ed-4cf9-95a2-d778a791394f=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([0_1, 0_3]) prevStandbyTasks: ([]) prevAssignedTasks: ([0_1, 0_3]) prevOwnedPartitionsByConsumerId: ([data-1, data-3]) changelogOffsetTotalsByTask: ([]) capacity: 1], 3405b9f3-7f28-454a-9a00-906f8c1b520f=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) prevOwnedPartitionsByConsumerId: ([]) changelogOffsetTotalsByTask: ([]) capacity: 1], 8611ff87-6782-4c27-b93e-c145b0bacccd=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([0_0, 0_2, 0_4]) prevStandbyTasks: ([]) prevAssignedTasks: ([0_0, 0_2, 0_4]) prevOwnedPartitionsByConsumerId: ([data-0, data-2, data-4]) changelogOffsetTotalsByTask: ([]) capacity: 1]} with number of replicas 0 (org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor) [2020-05-07 07:53:15,487] DEBUG [AdminClient clientId=StreamsUpgradeTest-0626d80d-03ed-4cf9-95a2-d778a791394f-admin] Sending MetadataRequestData(topics=[], allowAutoTopicCreation=false, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to worker2:9092 (id: 1 rack: null). correlationId=7 (org.apache.kafka.clients.admin.KafkaAdminClient) [2020-05-07 07:53:15,487] INFO stream-thread [StreamsUpgradeTest-0626d80d-03ed-4cf9-95a2-d778a791394f-StreamThread-1-consumer] Assigned tasks to clients as 0626d80d-03ed-4cf9-95a2-d778a791394f=[activeTasks: ([0_1, 0_3]) standbyTasks: ([]) assignedTasks: ([0_1, 0_3]) prevActiveTasks: ([0_1, 0_3]) prevStandbyTasks: ([]) prevAssignedTasks: ([0_1, 0_3]) prevOwnedPartitionsByConsumerId: ([data-1, data-3]) changelogOffsetTotalsByTask: ([]) capacity: 1] 3405b9f3-7f28-454a-9a00-906f8c1b520f=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) prevOwnedPartitionsByConsumerId: ([]) changelogOffsetTotalsByTask: ([]) capacity: 1] 8611ff87-6782-4c27-b93e-c145b0bacccd=[activeTasks: ([0_0, 0_2, 0_4]) standbyTasks: ([]) assignedTasks: ([0_0, 0_2, 0_4]) prevActiveTasks: ([0_0, 0_2, 0_4]) prevStandbyTasks: ([]) prevAssignedTasks: ([0_0, 0_2, 0_4]) prevOwnedPartitionsByConsumerId: ([data-0, data-2, data-4]) changelogOffsetTotalsByTask: ([]) capacity: 1]. (org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor) > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and skip the record processing validation when the assignment > is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9617) Replica Fetcher can mark partition as failed when max.message.bytes is changed
[ https://issues.apache.org/jira/browse/KAFKA-9617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106843#comment-17106843 ] Chia-Ping Tsai commented on KAFKA-9617: --- [https://github.com/apache/kafka/pull/8659] > Replica Fetcher can mark partition as failed when max.message.bytes is changed > -- > > Key: KAFKA-9617 > URL: https://issues.apache.org/jira/browse/KAFKA-9617 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Chia-Ping Tsai >Priority: Major > Labels: newbie > > There exists a race condition when changing the dynamic max.message.bytes > config for a topic. A follower replica can replicate a message that is over > that size after it processes the config change. When this happens, the > replica fetcher catches the unexpected exception, marks the partition as > failed and stops replicating it. > {code:java} > 06:38:46.596 Processing override for entityPath: topics/partition-1 with > config: Map(max.message.bytes -> 512) > 06:38:46.597 [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] > Unexpected error occurred while processing data for partition partition-1 at > offset 20964 > org.apache.kafka.common.errors.RecordTooLargeException: The record batch size > in the append to partition-1 is 3349 bytes which exceeds the maximum > configured value of 512. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106832#comment-17106832 ] Sophie Blee-Goldman commented on KAFKA-9987: Pseudo-code sketch of the algorithm: {{C_f := (P/N)_floor, the floor capacity}} {{C_c := (P/N)_ceil, the ceiling capacity}} {{members := the sorted set of all consumers}} {{partitions := the set of all partitions}} {{unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions}} {{unfilled_members := the set of consumers not yet at capacity, initialized to empty}} {{max_capacity_members := the set of members with exactly C_c partitions assigned, initialized to empty}} {{member.owned_partitions := the set of previously owned partitions encoded in the Subscription}} {{// Reassign as many previously owned partitions as possible}} {{for member : members}} {{}}{{remove any partitions that are no longer in the subscription from its owned partitions}} {{}}{{remove all owned_partitions if the generation is old}} {{}}{{if member.owned_partitions.size < C_f}} {{}}{{assign all owned partitions to member and remove from unassigned_partitions}} {{}}{{add member to unfilled_members}} {{}}{{else if member.owned_partitions.size == C_f}} {{}}{{assign first C_f owned_partitions to member and remove from unassigned_partitions}} {{}}{{else}} {{}}{{assign first C_c owned_partitions to member and remove from unassigned_partitions}} {{}}{{add member to max_capacity_members}} {{sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 (for data parallelism)}} {{sort unfilled_members by memberId (for determinism)}} {{// Fill remaining members up to C_f}} {{for member : unfilled_members}} {{}}{{compute the remaining capacity as C = C_f - num_assigned_partitions}} {{}}{{pop the first C partitions from unassigned_partitions and assign to member}} {{// Steal partitions from members with max_capacity if necessary}} {{if we run out of partitions before getting to the end of unfilled members:}} {{}}{{for member : unfilled_members}} {{}}{{poll for first member in max_capacity_members and remove one partition}} {{}}{{assign this partition to the unfilled member}} {{}} {{// Distribute remaining partitions, one per consumer, to fill some up to C_c if necessary}} {{if we run out of unfilled_members before assigning all partitions:}} {{}}{{for partition : unassigned_partitions}} {{}}{{assign to next member in members that is not in max_capacity_members (then add member to max_capacity_members)}} > Improve sticky partition assignor algorithm > --- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > In > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > we added the new CooperativeStickyAssignor which leverages on the underlying > sticky assignment algorithm of the existing StickyAssignor (moved to > AbstractStickyAssignor). The algorithm is fairly complex as it tries to > optimize stickiness while satisfying perfect balance _in the case individual > consumers may be subscribed to different subsets of the topics._ While it > does a pretty good job at what it promises to do, it doesn't scale well with > large numbers of consumers and partitions. > To give a concrete example, users have reported that it takes 2.5 minutes for > the assignment to complete with just 2100 consumers reading from 2100 > partitions. Since partitions revoked during the first of two cooperative > rebalances will remain unassigned until the end of the second rebalance, it's > important for the rebalance to be as fast as possible. And since one of the > primary improvements of the cooperative rebalancing protocol is better > scaling experience, the only OOTB cooperative assignor should not itself > scale poorly > If we can constrain the problem a bit, we can simplify the algorithm greatly. > In many cases the individual consumers won't be subscribed to some random > subset of the total subscription, they will all be subscribed to the same set > of topics and rely on the assignor to balance the partition workload. > We can detect this case by checking the group's individual subscriptions and > call on a more efficient assignment algorithm. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda closed pull request #7172: (DO NOT MERGE) Kip 447 condensed POC
abbccdda closed pull request #7172: URL: https://github.com/apache/kafka/pull/7172 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda closed pull request #7676: DEBUG: add partition empty check
abbccdda closed pull request #7676: URL: https://github.com/apache/kafka/pull/7676 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda closed pull request #7179: MINOR: improve StreamsBrokerDownResilienceTest debuggability
abbccdda closed pull request #7179: URL: https://github.com/apache/kafka/pull/7179 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
ableegoldman commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r424830762 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java ## @@ -16,77 +16,58 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import org.apache.kafka.streams.processor.TaskId; + import java.util.Collection; +import java.util.Comparator; import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; import java.util.PriorityQueue; import java.util.Set; import java.util.UUID; import java.util.function.BiFunction; -import org.apache.kafka.streams.processor.TaskId; +import java.util.function.Function; /** * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment */ -class ValidClientsByTaskLoadQueue { +class ConstrainedPrioritySet { private final PriorityQueue clientsByTaskLoad; -private final BiFunction validClientCriteria; +private final BiFunction constraint; private final Set uniqueClients = new HashSet<>(); -ValidClientsByTaskLoadQueue(final Map clientStates, -final BiFunction validClientCriteria) { -this.validClientCriteria = validClientCriteria; - -clientsByTaskLoad = new PriorityQueue<>( -(client, other) -> { -final double clientTaskLoad = clientStates.get(client).taskLoad(); -final double otherTaskLoad = clientStates.get(other).taskLoad(); -if (clientTaskLoad < otherTaskLoad) { -return -1; -} else if (clientTaskLoad > otherTaskLoad) { -return 1; -} else { -return client.compareTo(other); -} -}); +ConstrainedPrioritySet(final BiFunction constraint, + final Function weight) { +this.constraint = constraint; +clientsByTaskLoad = new PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> clientId)); } /** * @return the next least loaded client that satisfies the given criteria, or null if none do */ -UUID poll(final TaskId task) { -final List validClient = poll(task, 1); -return validClient.isEmpty() ? null : validClient.get(0); -} - -/** - * @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid candidates for the given task - */ -List poll(final TaskId task, final int numClients) { -final List nextLeastLoadedValidClients = new LinkedList<>(); +UUID poll(final TaskId task, final Function extraConstraint) { Review comment: > we know that we cannot consider C1 again for the second poll Yep, that's what I was getting at above. I'm totally on board with reducing the number of assumptions, especially as this class becomes more generally used. I was just intrigued by what you said initially and thought "This actually results in better balancing characteristics when assigning standbys" meant that you had actually seen a difference in the tests. Thanks for continuing to improve this class! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on pull request #8528: System tests should use --bootstrap-server rather than --zookeeper when testing new Kafka versions
d8tltanc commented on pull request #8528: URL: https://github.com/apache/kafka/pull/8528#issuecomment-628340161 Latest dev branch builds here (REMOVE_SYSTEM_4): https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3943/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
vvcephei commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r424825641 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java ## @@ -16,77 +16,58 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import org.apache.kafka.streams.processor.TaskId; + import java.util.Collection; +import java.util.Comparator; import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; import java.util.PriorityQueue; import java.util.Set; import java.util.UUID; import java.util.function.BiFunction; -import org.apache.kafka.streams.processor.TaskId; +import java.util.function.Function; /** * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment */ -class ValidClientsByTaskLoadQueue { +class ConstrainedPrioritySet { private final PriorityQueue clientsByTaskLoad; -private final BiFunction validClientCriteria; +private final BiFunction constraint; private final Set uniqueClients = new HashSet<>(); -ValidClientsByTaskLoadQueue(final Map clientStates, -final BiFunction validClientCriteria) { -this.validClientCriteria = validClientCriteria; - -clientsByTaskLoad = new PriorityQueue<>( -(client, other) -> { -final double clientTaskLoad = clientStates.get(client).taskLoad(); -final double otherTaskLoad = clientStates.get(other).taskLoad(); -if (clientTaskLoad < otherTaskLoad) { -return -1; -} else if (clientTaskLoad > otherTaskLoad) { -return 1; -} else { -return client.compareTo(other); -} -}); +ConstrainedPrioritySet(final BiFunction constraint, + final Function weight) { +this.constraint = constraint; +clientsByTaskLoad = new PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> clientId)); } /** * @return the next least loaded client that satisfies the given criteria, or null if none do */ -UUID poll(final TaskId task) { -final List validClient = poll(task, 1); -return validClient.isEmpty() ? null : validClient.get(0); -} - -/** - * @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid candidates for the given task - */ -List poll(final TaskId task, final int numClients) { -final List nextLeastLoadedValidClients = new LinkedList<>(); +UUID poll(final TaskId task, final Function extraConstraint) { Review comment: I was operating more on intuition here. To be honest, I had a suspicion you would call this out, so I probably should have just saved time and taken the time to prove it. Forgetting about the constraint for a minute, I think that what I had in mind for balance is something like, suppose you have two clients "C1" and "C2"... C1 has one task and C2 has two. You poll and get C1 and add a task. Now, they both have two. If you add it back and poll again, you might prefer to get C1 back again. Maybe because the "weight" function takes into account more than just the task load, or maybe just because of the total order we impose based on clientId, in which `C1 < C2`. But if you just poll two clients to begin with, then C1 doesn't get a chance to be included for the second poll, you just automatically get C1 and C2. In retrospect, this might be moot in practice, because the only time we actually polled for multiple clients was when assigning standbys, and specifically when we were assigning multiple replicas of the same task, in which case, we know that we _cannot_ consider C1 again for the second poll. From a computer-sciencey perspective, it doesn't seem like the data structure should be able to make this assumption, though, since it can't know that polling a client also invalidates it for a subsequent poll with the same last-mile predicate. So, even in retrospect, I'm tempted to leave it this way (big surprise there), although I'd acknowledge that the outcome is actually not different in the way that we would use the method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9991) Flaky Test KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled
Sophie Blee-Goldman created KAFKA-9991: -- Summary: Flaky Test KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled Key: KAFKA-9991 URL: https://issues.apache.org/jira/browse/KAFKA-9991 Project: Kafka Issue Type: Bug Components: streams Reporter: Sophie Blee-Goldman [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6280/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled/] h3. Stacktrace java.lang.AssertionError: Condition not met within timeout 3. Table did not read all values at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205) at org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(KTableSourceTopicRestartIntegrationTest.java:159) at org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled(KTableSourceTopicRestartIntegrationTest.java:143) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9688) kafka-topic.sh should show KIP-455 adding and removing replicas
[ https://issues.apache.org/jira/browse/KAFKA-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-9688. - Fix Version/s: 2.5.0 Reviewer: Colin McCabe Resolution: Fixed > kafka-topic.sh should show KIP-455 adding and removing replicas > --- > > Key: KAFKA-9688 > URL: https://issues.apache.org/jira/browse/KAFKA-9688 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Colin McCabe >Assignee: Cheng Tan >Priority: Major > Fix For: 2.5.0 > > > kafka-topic.sh should show KIP-455 adding and removing replicas, as described > in KIP-455. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
vvcephei commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r424822166 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java ## @@ -236,16 +233,17 @@ public void staticAssignmentShouldConvergeWithTheFirstAssignment() { 0, 1000L); -final Harness harness = Harness.initializeCluster(1, 1, 1); +final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1); testForConvergence(harness, configs, 1); verifyValidAssignment(0, harness); +verifyBalancedAssignment(harness); } @Test public void assignmentShouldConvergeAfterAddingNode() { -final int numStatelessTasks = 15; -final int numStatefulTasks = 13; +final int numStatelessTasks = 7; Review comment: Right, but I think there's a "3" or a "5" in there somewhere, maybe in the other test. Anyway, my _intent_ was to make them all prime so I wouldn't have to think to hard about whether they were all coprime. But, in reality, I managed to screw up both. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
vvcephei commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r424821641 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java ## @@ -35,262 +44,161 @@ import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasProperty; import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.UUID; -import java.util.stream.Collectors; -import org.apache.kafka.streams.processor.TaskId; -import org.junit.Test; +import static org.hamcrest.Matchers.is; public class TaskMovementTest { -private final ClientState client1 = new ClientState(1); -private final ClientState client2 = new ClientState(1); -private final ClientState client3 = new ClientState(1); - -private final Map clientStates = getClientStatesMap(client1, client2, client3); - -private final Map> emptyWarmupAssignment = mkMap( -mkEntry(UUID_1, EMPTY_TASK_LIST), -mkEntry(UUID_2, EMPTY_TASK_LIST), -mkEntry(UUID_3, EMPTY_TASK_LIST) -); - @Test public void shouldAssignTasksToClientsAndReturnFalseWhenAllClientsCaughtUp() { final int maxWarmupReplicas = Integer.MAX_VALUE; final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2); -final Map> balancedAssignment = mkMap( -mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)), -mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)), -mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2)) -); - final Map> tasksToCaughtUpClients = new HashMap<>(); for (final TaskId task : allTasks) { tasksToCaughtUpClients.put(task, mkSortedSet(UUID_1, UUID_2, UUID_3)); } - -assertFalse( + +final ClientState client1 = getClientStateWithActiveAssignment(asList(TASK_0_0, TASK_1_0)); +final ClientState client2 = getClientStateWithActiveAssignment(asList(TASK_0_1, TASK_1_1)); +final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2)); + +assertThat( assignTaskMovements( -balancedAssignment, tasksToCaughtUpClients, -clientStates, -getMapWithNumStandbys(allTasks, 1), -maxWarmupReplicas) +getClientStatesMap(client1, client2, client3), +maxWarmupReplicas), +is(false) ); - -verifyClientStateAssignments(balancedAssignment, emptyWarmupAssignment); } @Test public void shouldAssignAllTasksToClientsAndReturnFalseIfNoClientsAreCaughtUp() { -final int maxWarmupReplicas = 2; -final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2); +final int maxWarmupReplicas = Integer.MAX_VALUE; -final Map> balancedAssignment = mkMap( -mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)), -mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)), -mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2)) -); +final ClientState client1 = getClientStateWithActiveAssignment(asList(TASK_0_0, TASK_1_0)); +final ClientState client2 = getClientStateWithActiveAssignment(asList(TASK_0_1, TASK_1_1)); +final ClientState client3 = getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2)); -assertFalse( +assertThat( assignTaskMovements( -balancedAssignment, emptyMap(), -clientStates, -getMapWithNumStandbys(allTasks, 1), -maxWarmupReplicas) +getClientStatesMap(client1, client2, client3), +maxWarmupReplicas), +is(false) ); -verifyClientStateAssignments(balancedAssignment, emptyWarmupAssignment); } @Test public void shouldMoveTasksToCaughtUpClientsAndAssignWarmupReplicasInTheirPlace() { final int maxWarmupReplicas = Integer.MAX_VALUE; -final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); +final ClientState client1 = getClientStateWithActiveAssignment(singletonList(TASK_0_
[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
vvcephei commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r424820836 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -53,75 +67,94 @@ private static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task /** * @return whether any warmup replicas were assigned */ -static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, +static boolean assignTaskMovements(final Map> tasksToCaughtUpClients, final Map clientStates, - final Map tasksToRemainingStandbys, final int maxWarmupReplicas) { -boolean warmupReplicasAssigned = false; +final BiFunction caughtUpPredicate = +(client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients); -final ValidClientsByTaskLoadQueue clientsByTaskLoad = new ValidClientsByTaskLoadQueue( -clientStates, -(client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients) +final ConstrainedPrioritySet clientsByTaskLoad = new ConstrainedPrioritySet( +caughtUpPredicate, +client -> clientStates.get(client).taskLoad() ); -final SortedSet taskMovements = new TreeSet<>( -(movement, other) -> { -final int numCaughtUpClients = movement.caughtUpClients.size(); -final int otherNumCaughtUpClients = other.caughtUpClients.size(); -if (numCaughtUpClients != otherNumCaughtUpClients) { -return Integer.compare(numCaughtUpClients, otherNumCaughtUpClients); -} else { -return movement.task.compareTo(other.task); -} -} +final Queue taskMovements = new PriorityQueue<>( + Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task) ); -for (final Map.Entry> assignmentEntry : statefulActiveTaskAssignment.entrySet()) { -final UUID client = assignmentEntry.getKey(); -final ClientState state = clientStates.get(client); -for (final TaskId task : assignmentEntry.getValue()) { -if (taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients)) { -state.assignActive(task); -} else { -final TaskMovement taskMovement = new TaskMovement(task, client, tasksToCaughtUpClients.get(task)); -taskMovements.add(taskMovement); +for (final Map.Entry clientStateEntry : clientStates.entrySet()) { +final UUID client = clientStateEntry.getKey(); +final ClientState state = clientStateEntry.getValue(); +for (final TaskId task : state.activeTasks()) { +// if the desired client is not caught up, and there is another client that _is_ caught up, then +// we schedule a movement, so we can move the active task to the caught-up client. We'll try to +// assign a warm-up to the desired client so that we can move it later on. +if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients)) { +taskMovements.add(new TaskMovement(task, client, tasksToCaughtUpClients.get(task))); } } clientsByTaskLoad.offer(client); } +final boolean movementsNeeded = !taskMovements.isEmpty(); + final AtomicInteger remainingWarmupReplicas = new AtomicInteger(maxWarmupReplicas); for (final TaskMovement movement : taskMovements) { -final UUID sourceClient = clientsByTaskLoad.poll(movement.task); -if (sourceClient == null) { -throw new IllegalStateException("Tried to move task to caught-up client but none exist"); -} - -final ClientState sourceClientState = clientStates.get(sourceClient); -sourceClientState.assignActive(movement.task); -clientsByTaskLoad.offer(sourceClient); +final UUID standbySourceClient = clientsByTaskLoad.poll( Review comment: Agreed, it would just be good luck right now, but I figured we might as well capitalize on the luck. I'm planning to follow up pretty soon with the standby stickiness. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contac
[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
vvcephei commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r424819838 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -53,75 +67,94 @@ private static boolean taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task /** * @return whether any warmup replicas were assigned */ -static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, +static boolean assignTaskMovements(final Map> tasksToCaughtUpClients, final Map clientStates, - final Map tasksToRemainingStandbys, final int maxWarmupReplicas) { -boolean warmupReplicasAssigned = false; +final BiFunction caughtUpPredicate = +(client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients); -final ValidClientsByTaskLoadQueue clientsByTaskLoad = new ValidClientsByTaskLoadQueue( -clientStates, -(client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients) +final ConstrainedPrioritySet clientsByTaskLoad = new ConstrainedPrioritySet( Review comment: sure, that's a good idea. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
ableegoldman commented on pull request #8662: URL: https://github.com/apache/kafka/pull/8662#issuecomment-628329507 Nooo `EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]` still had one failure on the Java 8 build 😭 But it failed on a different line which seems more in line with real flakiness. FWIW I ran this locally 40 times without failures (technically 80 in total for both true/false variations) ... I think it's worth still merging this PR and we can continue investigating it from there. ``` java.lang.AssertionError: Did not receive all 20 records from topic multiPartitionOutputTopic within 6 ms Expected: is a value equal to or greater than <20> but: <5> was less than <20> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #6792: KAFKA 8311 : Augment the error message to let user change `default.api.timeout.ms`
mjsax commented on pull request #6792: URL: https://github.com/apache/kafka/pull/6792#issuecomment-628323776 @clearpal7 @abbccdda Just cleaning up my old backlog. Do you still want to get this in? PR would need a rebase. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #6617: MINOR: Allow scripts to be symlinked
mjsax commented on pull request #6617: URL: https://github.com/apache/kafka/pull/6617#issuecomment-628323063 Closing this PR due to inactivity. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax closed pull request #6617: MINOR: Allow scripts to be symlinked
mjsax closed pull request #6617: URL: https://github.com/apache/kafka/pull/6617 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #6656: MINOR: Add option to rebuild source for system tests
mjsax merged pull request #6656: URL: https://github.com/apache/kafka/pull/6656 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9987) Improve sticky partition assignor algorithm
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9987: --- Description: In [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] we added the new CooperativeStickyAssignor which leverages on the underlying sticky assignment algorithm of the existing StickyAssignor (moved to AbstractStickyAssignor). The algorithm is fairly complex as it tries to optimize stickiness while satisfying perfect balance _in the case individual consumers may be subscribed to different subsets of the topics._ While it does a pretty good job at what it promises to do, it doesn't scale well with large numbers of consumers and partitions. To give a concrete example, users have reported that it takes 2.5 minutes for the assignment to complete with just 2100 consumers reading from 2100 partitions. Since partitions revoked during the first of two cooperative rebalances will remain unassigned until the end of the second rebalance, it's important for the rebalance to be as fast as possible. And since one of the primary improvements of the cooperative rebalancing protocol is better scaling experience, the only OOTB cooperative assignor should not itself scale poorly If we can constrain the problem a bit, we can simplify the algorithm greatly. In many cases the individual consumers won't be subscribed to some random subset of the total subscription, they will all be subscribed to the same set of topics and rely on the assignor to balance the partition workload. We can detect this case by checking the group's individual subscriptions and call on a more efficient assignment algorithm. was: In KIP-429 we added the new CooperativeStickyAssignor which leverages on the underlying sticky assignment algorithm of the existing StickyAssignor (moved to AbstractStickyAssignor). The algorithm is fairly complex as it tries to optimize stickiness while satisfying perfect balance _in the case individual consumers may be subscribed to a random subset of the topics._ While it does a pretty good job at what it promises to do, it doesn't scale well with large numbers of consumers and partitions. If we can make the assumption that all consumers are subscribed to the same set of topics, we can simplify the algorithm greatly and do a sticky-but-balanced assignment in a single pass. It would be nice to have an additional cooperative assignor OOTB that performs efficiently for users who know their group will satisfy this constraint. > Improve sticky partition assignor algorithm > --- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > In > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > we added the new CooperativeStickyAssignor which leverages on the underlying > sticky assignment algorithm of the existing StickyAssignor (moved to > AbstractStickyAssignor). The algorithm is fairly complex as it tries to > optimize stickiness while satisfying perfect balance _in the case individual > consumers may be subscribed to different subsets of the topics._ While it > does a pretty good job at what it promises to do, it doesn't scale well with > large numbers of consumers and partitions. > To give a concrete example, users have reported that it takes 2.5 minutes for > the assignment to complete with just 2100 consumers reading from 2100 > partitions. Since partitions revoked during the first of two cooperative > rebalances will remain unassigned until the end of the second rebalance, it's > important for the rebalance to be as fast as possible. And since one of the > primary improvements of the cooperative rebalancing protocol is better > scaling experience, the only OOTB cooperative assignor should not itself > scale poorly > If we can constrain the problem a bit, we can simplify the algorithm greatly. > In many cases the individual consumers won't be subscribed to some random > subset of the total subscription, they will all be subscribed to the same set > of topics and rely on the assignor to balance the partition workload. > We can detect this case by checking the group's individual subscriptions and > call on a more efficient assignment algorithm. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9987) Improve sticky partition assignor algorithm
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9987: --- Summary: Improve sticky partition assignor algorithm (was: Add new cooperative assignor optimized for constant-subscription group) > Improve sticky partition assignor algorithm > --- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > In KIP-429 we added the new CooperativeStickyAssignor which leverages on the > underlying sticky assignment algorithm of the existing StickyAssignor (moved > to AbstractStickyAssignor). > > The algorithm is fairly complex as it tries to optimize stickiness while > satisfying perfect balance _in the case individual consumers may be > subscribed to a random subset of the topics._ While it does a pretty good job > at what it promises to do, it doesn't scale well with large numbers of > consumers and partitions. > > If we can make the assumption that all consumers are subscribed to the same > set of topics, we can simplify the algorithm greatly and do a > sticky-but-balanced assignment in a single pass. It would be nice to have an > additional cooperative assignor OOTB that performs efficiently for users who > know their group will satisfy this constraint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9850) Move KStream#repartition operator validation during Topology build process
[ https://issues.apache.org/jira/browse/KAFKA-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-9850. Fix Version/s: 2.6.0 Resolution: Fixed > Move KStream#repartition operator validation during Topology build process > --- > > Key: KAFKA-9850 > URL: https://issues.apache.org/jira/browse/KAFKA-9850 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Levani Kokhreidze >Assignee: HaiyuanZhao >Priority: Major > Labels: help-wanted, newbie, newbie++ > Fix For: 2.6.0 > > > `KStream#repartition` operation performs most of its validation regarding > joining, co-partitioning, etc after starting Kafka Streams instance. Some > parts of this validation can be detected much earlier, specifically during > topology `build()`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax merged pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…
mjsax merged pull request #8550: URL: https://github.com/apache/kafka/pull/8550 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…
mjsax commented on pull request #8550: URL: https://github.com/apache/kafka/pull/8550#issuecomment-628317830 Thanks for the PR @zhaohaidao! Merged to `trunk`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9465) Enclose consumer call with catching InvalidOffsetException
[ https://issues.apache.org/jira/browse/KAFKA-9465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-9465. Resolution: Not A Problem This was fixed implicitly via some code refactoring. > Enclose consumer call with catching InvalidOffsetException > -- > > Key: KAFKA-9465 > URL: https://issues.apache.org/jira/browse/KAFKA-9465 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Ted Yu >Priority: Minor > > In maybeUpdateStandbyTasks, the try block encloses restoreConsumer.poll and > record handling. > Since InvalidOffsetException is thrown by restoreConsumer.poll, we should > enclose this call in the try block. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax closed pull request #8001: KAFKA-9465: Enclose consumer call with catching InvalidOffsetException
mjsax closed pull request #8001: URL: https://github.com/apache/kafka/pull/8001 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8001: KAFKA-9465: Enclose consumer call with catching InvalidOffsetException
mjsax commented on pull request #8001: URL: https://github.com/apache/kafka/pull/8001#issuecomment-628316756 Thanks @guozhangwang. I resolve the Jira ticket. Closing this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes
mjsax commented on a change in pull request #8049: URL: https://github.com/apache/kafka/pull/8049#discussion_r424805332 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala ## @@ -30,12 +32,15 @@ object Serdes { implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long() implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray() implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes() + implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer() + implicit def shortSerde: Serde[Short] = JSerdes.Short().asInstanceOf[Serde[Short]] implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]] implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float() implicit def Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]] implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double() implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() + implicit def uuidSerde: Serde[UUID] = JSerdes.UUID() Review comment: I am fine with postponing this PR. \cc @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8637: KAFKA-9976: Reuse repartition node in all cases for KGroupedStream and KGroupedTable aggregates
mjsax commented on pull request #8637: URL: https://github.com/apache/kafka/pull/8637#issuecomment-628315503 @bbejeck Wondering if there are any backward incompatibility concerns? Can you explain why this change is safe? For #8504 there would be an exception before and thus it was broken and no compatibility concern raises. Not 100% sure about this PR though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8558: KAFKA-8611 / KIP-221 documentation
mjsax commented on pull request #8558: URL: https://github.com/apache/kafka/pull/8558#issuecomment-628314584 @lkokhreidze Did PR shows a conflict. Can you rebase it? Btw: did you see the follow up question on the mailing list? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soondenana edited a comment on pull request #8068: MINOR: Update to Gradle 4.10.3
soondenana edited a comment on pull request #8068: URL: https://github.com/apache/kafka/pull/8068#issuecomment-628308978 Closing PR as this doesn't fix issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soondenana commented on pull request #8068: MINOR: Update to Gradle 4.10.3
soondenana commented on pull request #8068: URL: https://github.com/apache/kafka/pull/8068#issuecomment-628308978 Issue is now fixed by release team by adding JVM flags for this. This PR is no longer needed. Closing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soondenana closed pull request #8068: MINOR: Update to Gradle 4.10.3
soondenana closed pull request #8068: URL: https://github.com/apache/kafka/pull/8068 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106744#comment-17106744 ] Sophie Blee-Goldman commented on KAFKA-9989: Yeah it would be good to dive into why this is happening – are the partitions missing because of a cooperative rebalance and we need to wait for one more rebalance to get the assignment? Or did the admin client's listOffsets request fail causing us to fall back on the PriorTaskAssignor? > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and skip the record processing validation when the assignment > is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks
guozhangwang commented on a change in pull request #8661: URL: https://github.com/apache/kafka/pull/8661#discussion_r424796625 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java ## @@ -248,17 +243,6 @@ void restoreAllInternal(final Collection> records) { final long segmentId = segments.segmentId(timestamp); final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment != null) { -// This handles the case that state store is moved to a new client and does not -// have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading -// will only close the database and open it again with bulk loading enabled. -if (!bulkLoadSegments.contains(segment)) { -segment.toggleDbForBulkLoading(true); -// If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that -// makes the open flag for the newly created store. -// if the store does exist already, then toggleDbForBulkLoading will make sure that -// the store is already open here. -bulkLoadSegments = new HashSet<>(segments.allSegments()); -} Review comment: Yup, that makes sense to me. I'm thinking about the world where standbys (and also restoring tasks) are executed on different threads. The concern about IQ are valid indeed that with a large set of un-compacted L0 files. In the even larger scope, where we would have checkpoints I'd believe that bulk-loading would not be very necessary since we would not have a huge number of records to catch up any more :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgroothuijsen opened a new pull request #8664: KAFKA-9716: Clarify meaning of compression rate metrics
rgroothuijsen opened a new pull request #8664: URL: https://github.com/apache/kafka/pull/8664 There is some confusion over the compression rate metrics, as the meaning of the value isn't clearly stated in the metric description. In this case, it was assumed that a higher compression rate value meant better compression. This PR clarifies the meaning of the value, to prevent misunderstandings. Alternative approaches that were considered were to either change the name of the metric or its implementation, but this would have a negative impact on those who are already making use of this metric. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9716) Values of compression-rate and compression-rate-avg are misleading
[ https://issues.apache.org/jira/browse/KAFKA-9716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rens Groothuijsen reassigned KAFKA-9716: Assignee: Rens Groothuijsen > Values of compression-rate and compression-rate-avg are misleading > -- > > Key: KAFKA-9716 > URL: https://issues.apache.org/jira/browse/KAFKA-9716 > Project: Kafka > Issue Type: Bug > Components: clients, compression >Affects Versions: 2.4.1 >Reporter: Christian Kosmowski >Assignee: Rens Groothuijsen >Priority: Minor > > The values of the following metrics: > compression-rate and compression-rate-avg and basically every other > compression-rate (i.e.) topic compression rate > are confusing. > They are calculated as follows: > {code:java} > if (numRecords == 0L) { > buffer().position(initialPosition); > builtRecords = MemoryRecords.EMPTY; > } else { > if (magic > RecordBatch.MAGIC_VALUE_V1) > this.actualCompressionRatio = (float) writeDefaultBatchHeader() / > this.uncompressedRecordsSizeInBytes; > else if (compressionType != CompressionType.NONE) > this.actualCompressionRatio = (float) > writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes; > ByteBuffer buffer = buffer().duplicate(); > buffer.flip(); > buffer.position(initialPosition); > builtRecords = MemoryRecords.readableRecords(buffer.slice()); > } > {code} > basically the compressed size is divided by the uncompressed size which leads > to a value < 1 for high compression (good if you want compression) or > 1 for > poor compression (bad if you want compression). > From the name "compression rate" i would expect the exact opposite. Apart > from the fact that the word "rate" usually refers to comparisons based on > values of different units (miles per hour) the correct word "ratio" would > refer to the uncompressed size divided by the compressed size. (In the code > this is correct, but not with the metric names) > So if the compressed data takes half the space of the uncompressed data the > correct value for compression ratio (or rate) would be 2 and not 0.5 as kafka > reports it. That is really confusing and i would AT LEAST expect that this > behaviour would be documented somewhere, but it's not all documentation > sources just say "the compression rate". -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
ijuma commented on pull request #8517: URL: https://github.com/apache/kafka/pull/8517#issuecomment-628301206 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9554) Define the SPI for Tiered Storage framework
[ https://issues.apache.org/jira/browse/KAFKA-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana reassigned KAFKA-9554: - Assignee: Satish Duggana > Define the SPI for Tiered Storage framework > --- > > Key: KAFKA-9554 > URL: https://issues.apache.org/jira/browse/KAFKA-9554 > Project: Kafka > Issue Type: Sub-task > Components: clients, core >Reporter: Alexandre Dupriez >Assignee: Satish Duggana >Priority: Major > > The goal of this task is to define the SPI (service provider interfaces) > which will be used by vendors to implement plug-ins to communicate with > specific storage system. > Done means: > * Package with interfaces and key objects available and published for review. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9990) Supporting transactions in tiered storage
Satish Duggana created KAFKA-9990: - Summary: Supporting transactions in tiered storage Key: KAFKA-9990 URL: https://issues.apache.org/jira/browse/KAFKA-9990 Project: Kafka Issue Type: Sub-task Reporter: Satish Duggana Assignee: Satish Duggana -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106731#comment-17106731 ] Matthias J. Sax commented on KAFKA-9989: Seems like an issue in rebalancing itself? Shouldn't we always get a balanced assignment? > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and skip the record processing validation when the assignment > is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9989: --- Component/s: system tests streams > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and skip the record processing validation when the assignment > is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API
mjsax commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r424758763 ## File path: docs/streams/developer-guide/processor-api.html ## @@ -396,33 +396,52 @@ Connecting Processors and State Stores// and the WordCountProcessor node as its upstream processor .addSink("Sink", "sink-topic", "Process"); + +Here is a quick explanation of this example: + +A source processor node named "Source" is added to the topology using the addSource method, with one Kafka topic +"source-topic" fed to it. +A processor node named "Process" with the pre-defined WordCountProcessor logic is then added as the downstream +processor of the "Source" node using the addProcessor method. +A predefined persistent key-value state store is added and connected to the "Process" node, using +countStoreBuilder. +A sink processor node is then added to complete the topology using the addSink method, taking the "Process" node +as its upstream processor and writing to a separate "sink-topic" Kafka topic (note that users can also use another overloaded variant of addSink +to dynamically determine the Kafka topic to write to for each received record from the upstream processor). + + +In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor it is connected to to the topology. +This can be done by implementing ConnectedStoreProvider#stores() on the ProcessorSupplier +in place of calling Topology#addStateStore(), like this: + +Topology builder = new Topology(); + +// add the source processor node that takes Kafka topic "source-topic" as input +builder.addSource("Source", "source-topic") + +// add the WordCountProcessor node which takes the source processor as its upstream processor, along with its state store +builder.addProcessor("Process", new ProcessorSupplier() { public Processorget() { return new WordCountProcessor(); } public Set stores() { return countStoreBuilder; } }); + +// add the sink processor node that takes Kafka topic "sink-topic" as output +// and the WordCountProcessor node as its upstream processor +.addSink("Sink", "sink-topic", "Process"); + +This allows for a processor to "own" state stores, effectively encapsulating its usage from the user wiring the topology. +Multiple processors that share a state store may provide the same store with this technique, as long as the StoreBuilder is the same instance. +In these topologies, the "Process" stream processor node is considered a downstream processor of the "Source" node, and an +upstream processor of the "Sink" node. As a result, whenever the "Source" node forwards a newly fetched record from +Kafka to its downstream "Process" node, the WordCountProcessor#process() method is triggered to process the record and +update the associated state store. Whenever context#forward() is called in the +WordCountProcessor#punctuate() method, the aggregate key-value pair will be sent via the "Sink" processor node to +the Kafka topic "sink-topic". Note that in the WordCountProcessor implementation, you must refer to the +same store name "Counts" when accessing the key-value store, otherwise an exception will be thrown at runtime, +indicating that the state store cannot be found. If the state store is not associated with the processor +in the Topology code, accessing it in the processor’s init() method will also throw an exception at +runtime, indicating the state store is not accessible from this processor. +Now that you have fully defined your processor topology in your application, you can proceed to +running the Kafka Streams application. + Review comment: Meta comment: can you also extend `streams/upgrade-guide.html` -- there is a section "public API" changes for the 2.6 release. ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3164,10 +3164,17 @@ Tip Even though we do not demonstrate it in this example, a stream processor can access any available state stores by -calling ProcessorContext#getStateStore(). Only such state stores are available that (1)
[GitHub] [kafka] lbradstreet commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
lbradstreet commented on pull request #8517: URL: https://github.com/apache/kafka/pull/8517#issuecomment-628293868 @ijuma the updated PR should compile OK. I removed the change to make SystemTime package private as it seems to be used in other tooling now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks
ableegoldman commented on a change in pull request #8661: URL: https://github.com/apache/kafka/pull/8661#discussion_r424771783 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java ## @@ -248,17 +243,6 @@ void restoreAllInternal(final Collection> records) { final long segmentId = segments.segmentId(timestamp); final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment != null) { -// This handles the case that state store is moved to a new client and does not -// have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading -// will only close the database and open it again with bulk loading enabled. -if (!bulkLoadSegments.contains(segment)) { -segment.toggleDbForBulkLoading(true); -// If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that -// makes the open flag for the newly created store. -// if the store does exist already, then toggleDbForBulkLoading will make sure that -// the store is already open here. -bulkLoadSegments = new HashSet<>(segments.allSegments()); -} Review comment: Here are the current bulk loading configs: ``` dbOptions.setMaxBackgroundFlushes(4); columnFamilyOptions.setDisableAutoCompactions(true); columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30); columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30); columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30); ``` Setting aside the problems these are causing users [even for active tasks](https://issues.apache.org/jira/browse/KAFKA-9062), they basically mean "shove everything into the lowest file level and never attempt to limit these writes". This is useful if you're just trying to shove a lot of data into a store as fast as possible but not necessary need to use it immediately after, which is (debatably) the right thing for restoring tasks but definitely not appropriate for standbys*. We will attempt to restore a batch of records once per main thread loop, which means doing a lot of other stuff in between. There's no reason not to just use normal mode writing for standbys AFAICT -- also bulk loading will make IQ on standbys pretty annoying at best. *In the larger scope, perhaps when we move standbys to a separate thread, I'd say we actually should be turning on bulk loading. BUT we need to issue a manual compaction every so often, and ideally not flush them during every commit (related to [KAFKA-9450](https://issues.apache.org/jira/browse/KAFKA-9450) ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java ## @@ -248,17 +243,6 @@ void restoreAllInternal(final Collection> records) { final long segmentId = segments.segmentId(timestamp); final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment != null) { -// This handles the case that state store is moved to a new client and does not -// have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading -// will only close the database and open it again with bulk loading enabled. -if (!bulkLoadSegments.contains(segment)) { -segment.toggleDbForBulkLoading(true); -// If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that -// makes the open flag for the newly created store. -// if the store does exist already, then toggleDbForBulkLoading will make sure that -// the store is already open here. -bulkLoadSegments = new HashSet<>(segments.allSegments()); -} Review comment: Here are the current bulk loading configs: ``` dbOptions.setMaxBackgroundFlushes(4); columnFamilyOptions.setDisableAutoCompactions(true); columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30); columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30); columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30); ``` Setting aside the problems these are causing users [even for active tasks](https://issues.apache.org/jira/browse/KAFKA-9062), they basically mean "shove everything into the lowest file level and never attempt to limit these writes". This is useful if you're just trying to shove a lot of data into a store as fast as possible but not necessary need to use it immediately after, which is (debatably) the right thing for restoring tasks but definitely not appropriate for stan
[jira] [Created] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
Boyang Chen created KAFKA-9989: -- Summary: StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task Key: KAFKA-9989 URL: https://issues.apache.org/jira/browse/KAFKA-9989 Project: Kafka Issue Type: Bug Reporter: Boyang Chen Assignee: Boyang Chen System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: "Never saw output 'processed [0-9]* records' on ubuntu@worker6" which if we take a closer look at, the rebalance happens but has no task assignment. We should fix this problem by making the rebalance result as part of the check, and skip the record processing validation when the assignment is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #8556: MINOR: Add a duplicate() method to Message classes
cmccabe merged pull request #8556: URL: https://github.com/apache/kafka/pull/8556 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks
guozhangwang commented on a change in pull request #8661: URL: https://github.com/apache/kafka/pull/8661#discussion_r424761219 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java ## @@ -248,17 +243,6 @@ void restoreAllInternal(final Collection> records) { final long segmentId = segments.segmentId(timestamp); final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment != null) { -// This handles the case that state store is moved to a new client and does not -// have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading -// will only close the database and open it again with bulk loading enabled. -if (!bulkLoadSegments.contains(segment)) { -segment.toggleDbForBulkLoading(true); -// If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that -// makes the open flag for the newly created store. -// if the store does exist already, then toggleDbForBulkLoading will make sure that -// the store is already open here. -bulkLoadSegments = new HashSet<>(segments.allSegments()); -} Review comment: Actually even for standby tasks, it should also be beneficial to use bulk-loading right (e.g. if the standby is far behind the active and has a large amount of records)? I'm thinking that in the long run, maybe we could optionally allow restore callbacks to be triggered for standby as well: we can use some simple heuristics such that if the changelog log-end offset - standby task's store offset > certain threshold, we trigger onRestoreStart(), and then we can goes back from the "sprinting" mode to normal mode after we've been close enough to the log-end offset. At the mean time, we can maybe hack a bit so that when `segment.toggleDbForBulkLoading` we set a flag and in the other we reset the flag, then during restoreAll we check the flag to decide whether enable bulk loading for newly created segment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
mjsax commented on pull request #8662: URL: https://github.com/apache/kafka/pull/8662#issuecomment-628272802 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
mjsax commented on pull request #8662: URL: https://github.com/apache/kafka/pull/8662#issuecomment-628272606 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
ableegoldman commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r424758127 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java ## @@ -236,16 +233,17 @@ public void staticAssignmentShouldConvergeWithTheFirstAssignment() { 0, 1000L); -final Harness harness = Harness.initializeCluster(1, 1, 1); +final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1); testForConvergence(harness, configs, 1); verifyValidAssignment(0, harness); +verifyBalancedAssignment(harness); } @Test public void assignmentShouldConvergeAfterAddingNode() { -final int numStatelessTasks = 15; -final int numStatefulTasks = 13; +final int numStatelessTasks = 7; Review comment: Well, if you have a set of N prime numbers and one number which isn't, aren't they all still coprime? :P This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9659) Kafka Streams / Consumer configured for static membership fails on "fatal exception: group.instance.id gets fenced"
[ https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106699#comment-17106699 ] Guozhang Wang commented on KAFKA-9659: -- Hi [~lkokhreidze] Could you try out the latest trunk for streams client (you do not need to upgrade broker versions) and see if it resolves the issue? I've pushed a couple of commits trying to fix this issue but unfortunately they are not in any released version yet. > Kafka Streams / Consumer configured for static membership fails on "fatal > exception: group.instance.id gets fenced" > --- > > Key: KAFKA-9659 > URL: https://issues.apache.org/jira/browse/KAFKA-9659 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Rohan Desai >Assignee: Guozhang Wang >Priority: Major > Attachments: ksql-1.logs > > > I'm running a KSQL query, which underneath is built into a Kafka Streams > application. The application has been running without issue for a few days, > until today, when all the streams threads exited with: > > > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Received fatal exception: group.instance.id gets fenced}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread run - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors:}} > \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker > rejected this static consumer since another consumer with the same > group.instance.id has registered with a different member.id.}}{{[INFO] > 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread setState - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > State transition from RUNNING to PENDING_SHUTDOWN}} > > I've attached the KSQL and Kafka Streams logs to this ticket. Here's a > summary for one of the streams threads (instance id `ksql-1-2`): > > Around 00:56:36 the coordinator fails over from b11 to b2: > > {{[INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to > heartbeat failed since coordinator > b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: > null) is either not started or not valid.}} > {{ [INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator > markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group > coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud
[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
mjsax commented on pull request #8662: URL: https://github.com/apache/kafka/pull/8662#issuecomment-628265887 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9966) Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-9966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-9966. Fix Version/s: 2.6.0 Resolution: Fixed > Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-9966 > URL: https://issues.apache.org/jira/browse/KAFKA-9966 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Sophie Blee-Goldman >Priority: Critical > Labels: flaky-test > Fix For: 2.6.0 > > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/] > {quote}java.lang.AssertionError: Condition not met within timeout 6. > Clients did not startup and stabilize on time. Observed transitions: client-1 > transitions: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] > client-2 transitions: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, > RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.waitForStateTransition(EosBetaUpgradeIntegrationTest.java:924) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:741){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9966) Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-9966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106689#comment-17106689 ] Matthias J. Sax commented on KAFKA-9966: Also related: [https://github.com/apache/kafka/pull/8662] > Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-9966 > URL: https://issues.apache.org/jira/browse/KAFKA-9966 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Sophie Blee-Goldman >Priority: Critical > Labels: flaky-test > Fix For: 2.6.0 > > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/] > {quote}java.lang.AssertionError: Condition not met within timeout 6. > Clients did not startup and stabilize on time. Observed transitions: client-1 > transitions: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] > client-2 transitions: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, > RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.waitForStateTransition(EosBetaUpgradeIntegrationTest.java:924) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:741){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9966) Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-9966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-9966: -- Assignee: Sophie Blee-Goldman (was: Matthias J. Sax) > Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-9966 > URL: https://issues.apache.org/jira/browse/KAFKA-9966 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Sophie Blee-Goldman >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/] > {quote}java.lang.AssertionError: Condition not met within timeout 6. > Clients did not startup and stabilize on time. Observed transitions: client-1 > transitions: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] > client-2 transitions: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, > RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.waitForStateTransition(EosBetaUpgradeIntegrationTest.java:924) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:741){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
ableegoldman commented on a change in pull request #8662: URL: https://github.com/apache/kafka/pull/8662#discussion_r424751772 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -1862,6 +1866,35 @@ public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOf assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); } +@Test +public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics() { +adminClient = EasyMock.createMock(AdminClient.class); +final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); +final KafkaFutureImpl> allFuture = new KafkaFutureImpl<>(); +allFuture.complete(emptyMap()); + +expect(adminClient.listOffsets(emptyMap())).andStubReturn(result); +expect(result.all()).andReturn(allFuture); + +builder.addSource(null, "source1", null, null, null, "topic1"); +builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); +builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); + +subscriptions.put("consumer10", + new Subscription( + singletonList("topic1"), + defaultSubscriptionInfo.encode() + )); + +EasyMock.replay(result); Review comment: It gets replayed during configuration (at the end of `configureDefault` below) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #8648: KAFKA-9966: add internal assignment listener to stabilize eos-beta upgrade test
mjsax merged pull request #8648: URL: https://github.com/apache/kafka/pull/8648 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
mjsax commented on pull request #8662: URL: https://github.com/apache/kafka/pull/8662#issuecomment-628263949 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
mjsax commented on a change in pull request #8662: URL: https://github.com/apache/kafka/pull/8662#discussion_r424750958 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -1862,6 +1866,35 @@ public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOf assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); } +@Test +public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics() { +adminClient = EasyMock.createMock(AdminClient.class); +final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); +final KafkaFutureImpl> allFuture = new KafkaFutureImpl<>(); +allFuture.complete(emptyMap()); + +expect(adminClient.listOffsets(emptyMap())).andStubReturn(result); +expect(result.all()).andReturn(allFuture); + +builder.addSource(null, "source1", null, null, null, "topic1"); +builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); +builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); + +subscriptions.put("consumer10", + new Subscription( + singletonList("topic1"), + defaultSubscriptionInfo.encode() + )); + +EasyMock.replay(result); Review comment: Don't we need to reply `adminClient`, too? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
ableegoldman commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r424729806 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java ## @@ -16,77 +16,58 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import org.apache.kafka.streams.processor.TaskId; + import java.util.Collection; +import java.util.Comparator; import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; import java.util.PriorityQueue; import java.util.Set; import java.util.UUID; import java.util.function.BiFunction; -import org.apache.kafka.streams.processor.TaskId; +import java.util.function.Function; /** * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment */ -class ValidClientsByTaskLoadQueue { +class ConstrainedPrioritySet { private final PriorityQueue clientsByTaskLoad; -private final BiFunction validClientCriteria; +private final BiFunction constraint; private final Set uniqueClients = new HashSet<>(); -ValidClientsByTaskLoadQueue(final Map clientStates, -final BiFunction validClientCriteria) { -this.validClientCriteria = validClientCriteria; - -clientsByTaskLoad = new PriorityQueue<>( -(client, other) -> { -final double clientTaskLoad = clientStates.get(client).taskLoad(); -final double otherTaskLoad = clientStates.get(other).taskLoad(); -if (clientTaskLoad < otherTaskLoad) { -return -1; -} else if (clientTaskLoad > otherTaskLoad) { -return 1; -} else { -return client.compareTo(other); -} -}); +ConstrainedPrioritySet(final BiFunction constraint, + final Function weight) { +this.constraint = constraint; +clientsByTaskLoad = new PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> clientId)); } /** * @return the next least loaded client that satisfies the given criteria, or null if none do */ -UUID poll(final TaskId task) { -final List validClient = poll(task, 1); -return validClient.isEmpty() ? null : validClient.get(0); -} - -/** - * @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid candidates for the given task - */ -List poll(final TaskId task, final int numClients) { -final List nextLeastLoadedValidClients = new LinkedList<>(); +UUID poll(final TaskId task, final Function extraConstraint) { Review comment: I'm not sure I see how the returned clients could ever be different using "poll N clients" vs "poll N times". Only the clients which are getting a new task assigned will have their weight changed while in the middle of an N poll, and once we assign this task to that client it no longer meets the criteria so we don't care about it anyway right? The reason for the "poll N clients" method was to save on some of the poll-and-reoffer of clients that don't meet the criteria, but I don't think that's really worth worrying over. I'm fine with whatever code is easiest to read -- just want to understand why this affects the balance? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java ## @@ -35,262 +44,161 @@ import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasProperty; import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.UUID; -import java.util.stream.Collectors; -import org.apache.kafka.streams.processor.TaskId; -import org.junit.Test; +import static org.hamcrest.Matchers.is; public class TaskMovementTest { -private final ClientState client1 = new ClientState(1); -private final ClientState client2 = new ClientState(1); -private final ClientState client3 = new ClientState(1); - -private final Map clientStates = getClientStatesMap(client1, c
[GitHub] [kafka] junrao commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
junrao commented on pull request #8479: URL: https://github.com/apache/kafka/pull/8479#issuecomment-628258485 @andrewchoi5 : Thanks for finding this issue. We fixed https://issues.apache.org/jira/browse/KAFKA-9932 recently. So, the `fetchLogConfig` from ZK method is called rarely during makeFollowers() now. If we do hit the ZK exception, there are a few options: (1) As Jason mentioned, we could keep retrying from ZK until successful. This probably needs to be done in a separate thread to avoid blocking the request handler thread. So, it can be a bit involved. (2) Don't update the leader epoch (we can choose to update the leader epoch after the local log is obtained successfully) and log an error. Since this issue should be rare now, maybe we can do (2) for now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
lbradstreet commented on pull request #8517: URL: https://github.com/apache/kafka/pull/8517#issuecomment-628257888 @ijuma thanks, I probably should have merged in before asking. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
ijuma commented on pull request #8517: URL: https://github.com/apache/kafka/pull/8517#issuecomment-628251072 @lbradstreet Compile errors: > 12:38:04 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java:29: error: SystemTime is not public in org.apache.kafka.common.utils; cannot be accessed from outside package > 12:38:04 import org.apache.kafka.common.utils.SystemTime; > 12:38:04 ^ > 12:38:04 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java:33: error: SystemTime is not public in org.apache.kafka.common.utils; cannot be accessed from outside package > 12:38:04 import org.apache.kafka.common.utils.SystemTime; > 12:38:04 ^ > 12:38:04 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java:63: error: SystemTime is not public in org.apache.kafka.common.utils; cannot be accessed from outside package > 12:38:04 private static final SystemTime SYSTEM_TIME = new SystemTime(); > 12:38:04 ^ > 12:38:04 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java:98: error: SystemTime is not public in org.apache.kafka.common.utils; cannot be accessed from outside package > 12:38:04 time = new SystemTime(); > 12:38:04^ > 12:38:04 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/tools/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionWorker.java:63: error: SystemTime is not public in org.apache.kafka.common.utils; cannot be accessed from outside package > 12:38:04 private static final SystemTime SYSTEM_TIME = new SystemTime(); This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9988) Connect incorrectly reports task has failed when task takes too long to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sanjana Kaundinya updated KAFKA-9988: - Affects Version/s: 2.5.1 2.4.2 2.3.2 2.2.3 2.3.0 2.4.0 2.3.1 2.5.0 2.4.1 > Connect incorrectly reports task has failed when task takes too long to > shutdown > > > Key: KAFKA-9988 > URL: https://issues.apache.org/jira/browse/KAFKA-9988 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.4.2, > 2.5.1 >Reporter: Sanjana Kaundinya >Priority: Major > > If the OffsetStorageReader is closed while the task is trying to shutdown, > and the task is trying to access the offsets from the OffsetStorageReader, > then we see the following in the logs. > {code:java} > [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets. > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114) > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader > closed while attempting to read offsets. This is likely because the task was > been scheduled to stop but has taken longer than the graceful shutdown period > to do so. > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) > ... 14 more > [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is > being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) > {code} > This is a bit misleading, because the task is already on its way of being > shutdown, and doesn't actually need manual intervention to be restarted. We > can see that as later on in the logs we see that it throws another > unrecoverable exception. > {code:java} > [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > {code} > If we know a task is on its way of shutting down, we should not throw a > ConnectException and instead log a warning so that we don't log false > negatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
vvcephei commented on pull request #8588: URL: https://github.com/apache/kafka/pull/8588#issuecomment-628236334 It looks like the test failures were unrelated (because they were all different), some known flaky: java 8:org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false] Java 11: org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled Java 14: kafka.api.TransactionsBounceTest.testWithGroupId kafka.api.TransactionsTest.testBumpTransactionalEpoch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9988) Log incorrectly reports task has failed when task takes too long to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sanjana Kaundinya updated KAFKA-9988: - Description: If the OffsetStorageReader is closed while the task is trying to shutdown, and the task is trying to access the offsets from the OffsetStorageReader, then we see the following in the logs. {code:java} [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets. at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader closed while attempting to read offsets. This is likely because the task was been scheduled to stop but has taken longer than the graceful shutdown period to do so. at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) ... 14 more [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) {code} This is a bit misleading, because the task is already on its way of being shutdown, and doesn't actually need manual intervention to be restarted. We can see that as later on in the logs we see that it throws another unrecoverable exception. {code:java} [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) {code} If we know a task is on its way of shutting down, we should not throw a ConnectException and instead log a warning so that we don't log false negatives. was: If the OffsetStorageReader is closed while the task is trying to shutdown, and the task is trying to access the offsets from the OffsetStorageReader, then we see the following in the logs. {code:java} [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=replicator-18} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets. at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader closed while attempting to read offsets. This is likely because the task was been scheduled to stop but has taken longer than the graceful shutdown period to do so. at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) ... 14 more [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=replicator-18} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) {code} This is a bit misleading, because the task is already on its way of being shutdown, and doesn't actually need manual intervention to be restarted. We can see that as later on in the logs we see that it throws another unrecoverable exception. {code:java} [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=replicator-18} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) {code} If w
[jira] [Updated] (KAFKA-9988) Connect incorrectly reports task has failed when task takes too long to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sanjana Kaundinya updated KAFKA-9988: - Summary: Connect incorrectly reports task has failed when task takes too long to shutdown (was: Log incorrectly reports task has failed when task takes too long to shutdown) > Connect incorrectly reports task has failed when task takes too long to > shutdown > > > Key: KAFKA-9988 > URL: https://issues.apache.org/jira/browse/KAFKA-9988 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sanjana Kaundinya >Priority: Major > > If the OffsetStorageReader is closed while the task is trying to shutdown, > and the task is trying to access the offsets from the OffsetStorageReader, > then we see the following in the logs. > {code:java} > [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets. > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114) > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader > closed while attempting to read offsets. This is likely because the task was > been scheduled to stop but has taken longer than the graceful shutdown period > to do so. > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) > ... 14 more > [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is > being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) > {code} > This is a bit misleading, because the task is already on its way of being > shutdown, and doesn't actually need manual intervention to be restarted. We > can see that as later on in the logs we see that it throws another > unrecoverable exception. > {code:java} > [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > {code} > If we know a task is on its way of shutting down, we should not throw a > ConnectException and instead log a warning so that we don't log false > negatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9988) Log incorrectly reports task has failed when task takes too long to shutdown
Sanjana Kaundinya created KAFKA-9988: Summary: Log incorrectly reports task has failed when task takes too long to shutdown Key: KAFKA-9988 URL: https://issues.apache.org/jira/browse/KAFKA-9988 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sanjana Kaundinya If the OffsetStorageReader is closed while the task is trying to shutdown, and the task is trying to access the offsets from the OffsetStorageReader, then we see the following in the logs. {code:java} [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=replicator-18} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets. at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader closed while attempting to read offsets. This is likely because the task was been scheduled to stop but has taken longer than the graceful shutdown period to do so. at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103) ... 14 more [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=replicator-18} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) {code} This is a bit misleading, because the task is already on its way of being shutdown, and doesn't actually need manual intervention to be restarted. We can see that as later on in the logs we see that it throws another unrecoverable exception. {code:java} [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=replicator-18} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) {code} If we know a task is on its way of shutting down, we should not throw a ConnectException and instead log a warning so that we don't log false negatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
vvcephei commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r424718290 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java ## @@ -75,4 +94,303 @@ static UUID uuidForInt(final int n) { return new UUID(0, n); } + +static void assertValidAssignment(final int numStandbyReplicas, + final Set statefulTasks, + final Set statelessTasks, + final Map assignedStates, + final StringBuilder failureContext) { +assertValidAssignment( +numStandbyReplicas, +0, +statefulTasks, +statelessTasks, +assignedStates, +failureContext +); +} + +static void assertValidAssignment(final int numStandbyReplicas, + final int maxWarmupReplicas, + final Set statefulTasks, + final Set statelessTasks, + final Map assignedStates, + final StringBuilder failureContext) { +final Map> assignments = new TreeMap<>(); +for (final TaskId taskId : statefulTasks) { +assignments.put(taskId, new TreeSet<>()); +} +for (final TaskId taskId : statelessTasks) { +assignments.put(taskId, new TreeSet<>()); +} +for (final Map.Entry entry : assignedStates.entrySet()) { +validateAndAddActiveAssignments(statefulTasks, statelessTasks, failureContext, assignments, entry); +validateAndAddStandbyAssignments(statefulTasks, statelessTasks, failureContext, assignments, entry); +} + +final AtomicInteger remainingWarmups = new AtomicInteger(maxWarmupReplicas); + +final TreeMap> misassigned = +assignments +.entrySet() +.stream() +.filter(entry -> { +final int expectedActives = 1; +final boolean isStateless = statelessTasks.contains(entry.getKey()); +final int expectedStandbys = isStateless ? 0 : numStandbyReplicas; +// We'll never assign even the expected number of standbys if they don't actually fit in the cluster +final int expectedAssignments = Math.min( +assignedStates.size(), +expectedActives + expectedStandbys +); +final int actualAssignments = entry.getValue().size(); +if (actualAssignments == expectedAssignments) { +return false; // not misassigned +} else { +if (actualAssignments == expectedAssignments + 1 && remainingWarmups.get() > 0) { +remainingWarmups.getAndDecrement(); +return false; // it's a warmup, so it's fine +} else { +return true; // misassigned +} +} +}) +.collect(entriesToMap(TreeMap::new)); + +if (!misassigned.isEmpty()) { Review comment: L131-158 is just gathering the information about whether each task is correctly assigned or not, based on its type and the standby configs (and maybe the warmup config). It doesn't make any assertions. So this check is actually the assertion, that no tasks are incorrectly assigned. Doing it this way is nicer, since when it fails, it tells you _all_ the incorrectly assigned tasks, not just the first one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas
apovzner commented on pull request #8650: URL: https://github.com/apache/kafka/pull/8650#issuecomment-628234920 @dajac Thanks for your very thorough review. I addressed all your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #8650: MINOR: Added unit tests for ConnectionQuotas
apovzner commented on a change in pull request #8650: URL: https://github.com/apache/kafka/pull/8650#discussion_r424707495 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.net.InetAddress +import java.util.concurrent.{Executors, TimeUnit} +import java.util.Properties + +import com.yammer.metrics.core.Meter +import kafka.metrics.KafkaMetricsGroup +import kafka.network.Processor.ListenerMetricTag +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.common.network._ +import org.apache.kafka.common.utils.MockTime +import org.junit.Assert._ +import org.junit._ +import org.scalatest.Assertions.intercept + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.concurrent.TimeoutException + +class ConnectionQuotasTest { + private val time = new MockTime + private val listeners = Map( +"EXTERNAL" -> ListenerDesc(new ListenerName("EXTERNAL"), InetAddress.getByName("192.168.1.1")), +"ADMIN" -> ListenerDesc(new ListenerName("ADMIN"), InetAddress.getByName("192.168.1.2")), +"REPLICATION" -> ListenerDesc(new ListenerName("REPLICATION"), InetAddress.getByName("192.168.1.3"))) + private val blockedPercentMeters = mutable.Map[String, Meter]() + private val knownHost = InetAddress.getByName("192.168.10.0") + private val unknownHost = InetAddress.getByName("192.168.2.0") + + case class ListenerDesc(listenerName: ListenerName, defaultClientIp: InetAddress) { +override def toString: String = { + s"(listener=${listenerName.value}, client=${defaultClientIp.getHostAddress})" +} + } + + def brokerPropsWithDefaultConnectionLimits: Properties = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) +props.put(KafkaConfig.ListenersProp, "EXTERNAL://localhost:0,REPLICATION://localhost:1,ADMIN://localhost:2") +// ConnectionQuotas does not limit inter-broker listener even when broker-wide connection limit is reached +props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION") +props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "EXTERNAL:PLAINTEXT,REPLICATION:PLAINTEXT,ADMIN:PLAINTEXT") +props + } + + @Before + def setUp(): Unit = { +// Clean-up any metrics left around by previous tests +TestUtils.clearYammerMetrics() + +listeners.keys.foreach { name => +blockedPercentMeters.put(name, KafkaMetricsGroup.newMeter( + s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> name))) +} + } + + @After + def tearDown(): Unit = { +TestUtils.clearYammerMetrics() +blockedPercentMeters.clear() + } + + @Test + def testFailWhenNoListeners(): Unit = { +val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits) +val connectionQuotas = new ConnectionQuotas(config, time) + +val executor = Executors.newSingleThreadExecutor +try { + // inc() on a separate thread in case it blocks + val externalListener = listeners("EXTERNAL") + executor.submit((() => + intercept[RuntimeException](connectionQuotas.inc(externalListener.listenerName, externalListener.defaultClientIp, blockedPercentMeters("EXTERNAL": Runnable + ).get(5, TimeUnit.SECONDS) +} finally { + executor.shutdownNow() +} + } + + @Test + def testNoConnectionLimitsByDefault(): Unit = { +val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits) +val connectionQuotas = new ConnectionQuotas(config, time) +addListenersAndVerify(config, connectionQuotas) + +val executor = Executors.newFixedThreadPool(listeners.size) +try { + // verify there is no limit by accepting 1 connections as fast as possible + val numConnections = 1 + val futures = listeners.values.map( listener => +executor.submit((() => acceptConnections(connectionQuotas, listener, numConnections)): Runnable) ) + futures.foreach(_.get(10, TimeUnit.SECONDS)) + listeners.values.foreach { listener => +assertEquals(s"${l
[GitHub] [kafka] apovzner commented on a change in pull request #8650: MINOR: Added unit tests for ConnectionQuotas
apovzner commented on a change in pull request #8650: URL: https://github.com/apache/kafka/pull/8650#discussion_r424706138 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.net.InetAddress +import java.util.concurrent.{Executors, TimeUnit} +import java.util.Properties + +import com.yammer.metrics.core.Meter +import kafka.metrics.KafkaMetricsGroup +import kafka.network.Processor.ListenerMetricTag +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.common.network._ +import org.apache.kafka.common.utils.MockTime +import org.junit.Assert._ +import org.junit._ +import org.scalatest.Assertions.intercept + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.concurrent.TimeoutException + +class ConnectionQuotasTest { + private val time = new MockTime + private val listeners = Map( +"EXTERNAL" -> ListenerDesc(new ListenerName("EXTERNAL"), InetAddress.getByName("192.168.1.1")), +"ADMIN" -> ListenerDesc(new ListenerName("ADMIN"), InetAddress.getByName("192.168.1.2")), +"REPLICATION" -> ListenerDesc(new ListenerName("REPLICATION"), InetAddress.getByName("192.168.1.3"))) + private val blockedPercentMeters = mutable.Map[String, Meter]() + private val knownHost = InetAddress.getByName("192.168.10.0") + private val unknownHost = InetAddress.getByName("192.168.2.0") + + case class ListenerDesc(listenerName: ListenerName, defaultClientIp: InetAddress) { +override def toString: String = { + s"(listener=${listenerName.value}, client=${defaultClientIp.getHostAddress})" +} + } + + def brokerPropsWithDefaultConnectionLimits: Properties = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) +props.put(KafkaConfig.ListenersProp, "EXTERNAL://localhost:0,REPLICATION://localhost:1,ADMIN://localhost:2") +// ConnectionQuotas does not limit inter-broker listener even when broker-wide connection limit is reached +props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION") +props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "EXTERNAL:PLAINTEXT,REPLICATION:PLAINTEXT,ADMIN:PLAINTEXT") +props + } + + @Before + def setUp(): Unit = { +// Clean-up any metrics left around by previous tests +TestUtils.clearYammerMetrics() + +listeners.keys.foreach { name => +blockedPercentMeters.put(name, KafkaMetricsGroup.newMeter( + s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> name))) +} + } + + @After + def tearDown(): Unit = { +TestUtils.clearYammerMetrics() +blockedPercentMeters.clear() + } + + @Test + def testFailWhenNoListeners(): Unit = { +val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits) +val connectionQuotas = new ConnectionQuotas(config, time) + +val executor = Executors.newSingleThreadExecutor +try { + // inc() on a separate thread in case it blocks + val externalListener = listeners("EXTERNAL") + executor.submit((() => + intercept[RuntimeException](connectionQuotas.inc(externalListener.listenerName, externalListener.defaultClientIp, blockedPercentMeters("EXTERNAL": Runnable + ).get(5, TimeUnit.SECONDS) +} finally { + executor.shutdownNow() +} + } + + @Test + def testNoConnectionLimitsByDefault(): Unit = { +val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits) +val connectionQuotas = new ConnectionQuotas(config, time) +addListenersAndVerify(config, connectionQuotas) + +val executor = Executors.newFixedThreadPool(listeners.size) +try { + // verify there is no limit by accepting 1 connections as fast as possible + val numConnections = 1 + val futures = listeners.values.map( listener => +executor.submit((() => acceptConnections(connectionQuotas, listener, numConnections)): Runnable) ) + futures.foreach(_.get(10, TimeUnit.SECONDS)) + listeners.values.foreach { listener => +assertEquals(s"${l
[GitHub] [kafka] hachikuji commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
hachikuji commented on pull request #8479: URL: https://github.com/apache/kafka/pull/8479#issuecomment-628204174 @andrewchoi5 Just want to make sure I understand the problem. The scenario is that we lose the zk session while handling a LeaderAndIsr request. Current LeaderAndIsr handling works like this: 1. Check epoch of each partition. If it is less than or equal to current epoch, ignore the update. 2. Update the epoch for each partition. 3. Make followers and leaders, which involves loading topic configs from zk. So the problem occurs when we hit an error loading the topic configs in step 3). This potentially causes us to miss the needed state changes from that request and also prevents us from being able to retry them because the epoch has already been updated. Is that right? I guess the fix here is only a partial fix. We would still be left with the one failed partition, right? Off the top of my head, I am wondering whether we should be retrying the request at a lower level. For example, maybe `getEntityConfigs` could catch the `ZooKeeperClientExpiredException` and retry. I assume there is a good reason we do not catch this exception in `retryRequestUntilConnected` already. Perhaps it is unsafe to assume that the requests are still valid after a session expiration. However, just for reading configurations, I do not see a problem retrying after a session expiration. cc @junrao This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
ijuma commented on pull request #8517: URL: https://github.com/apache/kafka/pull/8517#issuecomment-628200880 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
lbradstreet commented on pull request #8517: URL: https://github.com/apache/kafka/pull/8517#issuecomment-628192600 @ijuma can this be merged? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store
[ https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106558#comment-17106558 ] Matthias J. Sax commented on KAFKA-3184: I am personally not sure how useful "local checkpointing" is at all? Note that for persistent stores, the ides is to allow holding state that is larger than main memory. It's not really related to fault-tolerance or similar (it only has the nice side effect that rolling restarts are quick – for this case, maybe dumping the in-memory store on disk during shutdown might be helpful; but this would not be regular checkpointing). Also, to avoid too long state recovery times, used can configure standbys. For scale-out, the same issue arrises for in-memory and persistent stores and it's addressed via KIP-441. So we should not conflate orthogonal issue. Having a remote checkpoint mechanism would be a nice to have feature, but it raises a lot of complex issues we need to address. (1) where to actually put the store? Kafka Streams is a library and should not have other dependencies by default; thus, remote checkpointing must be an opt-in feature only. (2) How can we do incremental checkpointing (if it's not incremental, it's too heavy weight and we don't need to build it at all), (3) how do we "compact" the check point increments in the remote location? This is the hardest issue we need to solve. There was also the idea to actually re-use standbys for quick recovery: instead of checkpointing to remote storage, one just configures standbys. On recovery, instead of reading the changelog, we copy RocksDB sst files directly from the standby to the active (or to be more precise, the standby would be come the active anyway...) This approach avoids the dependency to an external system and also solves the "how to compact" issue, as RocksDB does it for us. Of course, it's more expensive (in dollars) to keep the state in the app compared to pushing it to cheap external storage. Thus some food for thoughts. > Add Checkpoint for In-memory State Store > > > Key: KAFKA-3184 > URL: https://issues.apache.org/jira/browse/KAFKA-3184 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Nikolay Izhikov >Priority: Major > Labels: user-experience > > Currently Kafka Streams does not make a checkpoint of the persistent state > store upon committing, which would be expensive since it is "stopping the > world" and write on disks: for example, RocksDB would require you to copy the > file directory to make a copy naively. > However, for in-memory stores checkpointing maybe doable in an asynchronous > manner hence it can be done quickly. And the benefit of having intermediate > checkpoint is to avoid restoring from scratch if standby tasks are not > present. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
[ https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-9897. -- Resolution: Fixed > Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores > - > > Key: KAFKA-9897 > URL: https://issues.apache.org/jira/browse/KAFKA-9897 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.6.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/] > {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get > state store source-table because the stream thread is PARTITIONS_ASSIGNED, > not RUNNING at > org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85) > at > org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61) > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9984) Should fail the subscription when pattern is empty
[ https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106549#comment-17106549 ] Guozhang Wang commented on KAFKA-9984: -- Sounds good to me :) > Should fail the subscription when pattern is empty > -- > > Key: KAFKA-9984 > URL: https://issues.apache.org/jira/browse/KAFKA-9984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Boyang Chen >Priority: Major > > We have seen a case where the consumer subscribes to an empty string pattern: > ``` > [Consumer ... ] Subscribed to pattern: '' > ``` > which doesn't make any sense and usually indicate a configuration error. The > `consumer.subscribe(pattern)` call should fail with illegal argument for this > case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
[ https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106548#comment-17106548 ] Matthias J. Sax commented on KAFKA-9897: Sure. We can always reopen again if necessary. What what the PR / Jira ticket number? > Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores > - > > Key: KAFKA-9897 > URL: https://issues.apache.org/jira/browse/KAFKA-9897 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.6.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/] > {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get > state store source-table because the stream thread is PARTITIONS_ASSIGNED, > not RUNNING at > org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85) > at > org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61) > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9984) Should fail the subscription when pattern is empty
[ https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9984: --- Component/s: (was: streams) > Should fail the subscription when pattern is empty > -- > > Key: KAFKA-9984 > URL: https://issues.apache.org/jira/browse/KAFKA-9984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Boyang Chen >Priority: Major > > We have seen a case where the consumer subscribes to an empty string pattern: > ``` > [Consumer ... ] Subscribed to pattern: '' > ``` > which doesn't make any sense and usually indicate a configuration error. The > `consumer.subscribe(pattern)` call should fail with illegal argument for this > case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9982) [kafka-connect] Source connector does not guarantee at least once delivery
[ https://issues.apache.org/jira/browse/KAFKA-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105942#comment-17105942 ] Chris Egerton edited comment on KAFKA-9982 at 5/13/20, 6:10 PM: The producers the framework uses to write data from source tasks to Kafka are [configured conservatively|https://github.com/apache/kafka/blob/9bc96d54f8953d190a1fb6478a0656f049ee3b32/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L557-L563] to prevent multiple concurrent in-flight requests, which might compromise the ordering of the records. As of the fix for https://issues.apache.org/jira/browse/KAFKA-8586, the framework will cease processing records from a source task if it fails to send a record to Kafka. The framework does use an entirely different producer to write source offsets to Kafka, but no offsets are written to Kafka unless the record they correspond to has been ack'd by the broker and safely made it to Kafka. [~q.xu] based on the source code for the worker, I don't think this analysis is correct. Have you observed this behavior yourself? was (Author: chrisegerton): The producers the framework uses to write data from source tasks to Kafka are [configured conservatively|https://github.com/apache/kafka/blob/9bc96d54f8953d190a1fb6478a0656f049ee3b32/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L557-L563] to prevent multiple concurrent in-flight requests, which might compromise the ordering of the records. As of the fix for https://issues.apache.org/jira/browse/KAFKA-8586, the framework will cease processing records from a source task if it fails to send a record to Kafka. The framework does use an entirely different producer to write source offsets to Kafka, but no offsets are written to Kafka unless the record they correspond has been ack'd by the broker and safely made it to Kafka. [~q.xu] based on the source code for the worker, I don't think this analysis is correct. Have you observed this behavior yourself? > [kafka-connect] Source connector does not guarantee at least once delivery > -- > > Key: KAFKA-9982 > URL: https://issues.apache.org/jira/browse/KAFKA-9982 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Qinghui Xu >Priority: Major > > In kafka-connect runtime, the WorkerSourceTask is responsible for sending > records to the destination topics and managing the source offset commit. > Committed offsets are then used later for recovery of tasks during rebalance > or restart. > But there are two concerns when looking into the WorkerSourceTask > implementation: > * When producer fail to send records, there's no retry but just skipping > offset commit and then execute next loop (poll for new records) > * The offset commit and effectively sending records over network are in fact > asynchronous, which means the offset commit could happen before records are > received by brokers, and a rebalance/restart in this gap could lead to > message loss. > The conclusion is thus that the source connector does not support at least > once semantics by default (without the plugin implementation making extra > effort itself). I consider this as a bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9659) Kafka Streams / Consumer configured for static membership fails on "fatal exception: group.instance.id gets fenced"
[ https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106528#comment-17106528 ] Levani Kokhreidze commented on KAFKA-9659: -- Hi [~guozhang] Just fyi - we've experienced this error as well. Some of our Kafka streams jobs (with static membership) sometimes die whenever we restart one of the brokers. It affects broker version 2.3.1 and 2.4 as well. We can reliably reproduce the issue in our "staging" environment so if some extra debug logs can be useful from clients/brokers would be happy to help. > Kafka Streams / Consumer configured for static membership fails on "fatal > exception: group.instance.id gets fenced" > --- > > Key: KAFKA-9659 > URL: https://issues.apache.org/jira/browse/KAFKA-9659 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Rohan Desai >Assignee: Guozhang Wang >Priority: Major > Attachments: ksql-1.logs > > > I'm running a KSQL query, which underneath is built into a Kafka Streams > application. The application has been running without issue for a few days, > until today, when all the streams threads exited with: > > > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Received fatal exception: group.instance.id gets fenced}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread run - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors:}} > \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker > rejected this static consumer since another consumer with the same > group.instance.id has registered with a different member.id.}}{{[INFO] > 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread setState - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > State transition from RUNNING to PENDING_SHUTDOWN}} > > I've attached the KSQL and Kafka Streams logs to this ticket. Here's a > summary for one of the streams threads (instance id `ksql-1-2`): > > Around 00:56:36 the coordinator fails over from b11 to b2: > > {{[INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to > heartbeat failed since coordinator > b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: > null) is either not started or not valid.}} > {{ [INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator > markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confl
[jira] [Commented] (KAFKA-9927) Add support for varint types to message generator
[ https://issues.apache.org/jira/browse/KAFKA-9927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17106465#comment-17106465 ] Tom Bentley commented on KAFKA-9927: [~hachikuji] I've implemented this (https://github.com/tombentley/kafka/pull/new/KAFKA-9927-varint-in-messages). Does this need change need a KIP since use of this variable length encoding for fields would be new to the protocol? If not I'll open a PR, otherwise I'll put a KIP together. > Add support for varint types to message generator > - > > Key: KAFKA-9927 > URL: https://issues.apache.org/jira/browse/KAFKA-9927 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Tom Bentley >Priority: Major > > It would be nice to be able to use either a "varint32" or "varint64" type or > to add a flag to indicate variable length encoding. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9987) Add new cooperative assignor optimized for constant-subscription group
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9987: --- Summary: Add new cooperative assignor optimized for constant-subscription group (was: Add new cooperative assignor optimized for constant-subscription. group) > Add new cooperative assignor optimized for constant-subscription group > -- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > In KIP-429 we added the new CooperativeStickyAssignor which leverages on the > underlying sticky assignment algorithm of the existing StickyAssignor (moved > to AbstractStickyAssignor). > > The algorithm is fairly complex as it tries to optimize stickiness while > satisfying perfect balance _in the case individual consumers may be > subscribed to a random subset of the topics._ While it does a pretty good job > at what it promises to do, it doesn't scale well with large numbers of > consumers and partitions. > > If we can make the assumption that all consumers are subscribed to the same > set of topics, we can simplify the algorithm greatly and do a > sticky-but-balanced assignment in a single pass. It would be nice to have an > additional cooperative assignor OOTB that performs efficiently for users who > know their group will satisfy this constraint. -- This message was sent by Atlassian Jira (v8.3.4#803005)