Re: [PR] KAFKA-12886: Enable request forwarding by default [kafka]
github-actions[bot] commented on PR #14698: URL: https://github.com/apache/kafka/pull/14698#issuecomment-1953432841 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15776: Use the FETCH request timeout as the delay timeout for DelayedRemoteFetch [kafka]
github-actions[bot] commented on PR #14778: URL: https://github.com/apache/kafka/pull/14778#issuecomment-1953432771 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] IGNORE: flaky testing [kafka]
github-actions[bot] commented on PR #14780: URL: https://github.com/apache/kafka/pull/14780#issuecomment-1953432752 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15801: WIP Add hostname and port in NetworkClient logging and increase connection issues logging severity. [kafka]
github-actions[bot] commented on PR #14804: URL: https://github.com/apache/kafka/pull/14804#issuecomment-1953432734 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix MetricsTest.testBrokerTopicMetricsBytesInOut assertion [kafka]
showuon merged PR #14744: URL: https://github.com/apache/kafka/pull/14744 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix MetricsTest.testBrokerTopicMetricsBytesInOut assertion [kafka]
showuon commented on PR #14744: URL: https://github.com/apache/kafka/pull/14744#issuecomment-1953424263 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15349: ducker-ak should fail fast when gradlew systemTestLibs fails [kafka]
showuon merged PR #15391: URL: https://github.com/apache/kafka/pull/15391 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] (KAFKA-16102) about DynamicListenerConfig, the dynamic modification of the listener's port or IP does not take effect.
[ https://issues.apache.org/jira/browse/KAFKA-16102 ] Jialun Peng deleted comment on KAFKA-16102: - was (Author: JIRAUSER303739): Hi, can you take a look at this issue? [~cmccabe_impala_fa3f] > about DynamicListenerConfig, the dynamic modification of the listener's port > or IP does not take effect. > > > Key: KAFKA-16102 > URL: https://issues.apache.org/jira/browse/KAFKA-16102 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 3.6.0 > Environment: Must be present in any environment. >Reporter: Jialun Peng >Assignee: Jialun Peng >Priority: Minor > Labels: easyfix > Fix For: 3.8.0 > > Original Estimate: 96h > Remaining Estimate: 96h > > When I dynamically modify the parameters related to Kafka listeners, such as > changing the IP or port value of a listener, the dynamic parameters under the > corresponding path in ZooKeeper are updated. However, in reality, the > modification of the IP or port for the corresponding listener does not take > effect. This phenomenon consistently occurs. And there is a slight > improvement as the error "Security protocol cannot be updated for existing > listener" will be eliminated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818599#comment-17818599 ] A. Sophie Blee-Goldman commented on KAFKA-14597: Hey [~atuljainiitk] I just came across this ticket for the first time, but for the most part I think your analysis makes sense. Are you still interested in picking this up? Specifically, reverting the linked change and getting the true system time for terminal nodes? It's been a while since that change was made/the KIP was implemented, so I don't remember all the context, but fetching the current time at terminal nodes sounds reasonable to me. Clearly the metric is otherwise useless, so we should either update it to be correct (and monitor for any potential performance impact) or just remove it entirely. And fixing it is obviously preferable, at least unless we know for a sure thing that it does hurt performance. I'm optimistic though cc also [~talestonini] [~cadonna] > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-07-24-352.png, > image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, > image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, > record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818594#comment-17818594 ] A. Sophie Blee-Goldman commented on KAFKA-16277: Hey [~credpath-seek] – first up, while it's been a while since I've looked at the sticky assignor code, I'm not too surprised that this might be the case. The obvious emphasis (per the name) was put on "stickiness" and partition-number balance, with good data parallelism ie topic-level balance being best-effort at most. That said, I suspect the assignor could be making a better effort. Presumably what is happening is that during the phase where it attempts to re-assign previously-owned partitions back to their former owner, we make a pass over a sorted list of previously-owned partitions that is grouped by topic. The assignor will then assign partitions from this list one-by-one to its previous owner until it hits the expected total number of partitions. So in the scenario you describe, it's basically looping over (t1p0, t1p1, t1p2, t1p3...t1pN, t2p0, t2p1, t2p2...t2pN) and assigning the first N partitions to the first consumer, which would be everything from topic 1, then just dumping the remaining partitions – all of which belong to topic 2 – onto the new consumer. The fix should be fairly simple – we just need to group this sorted list by partition, rather than by topic (ie t1p0, t2p0, t1p1, t2p1...t1pN, t2pN). Would you be interested in submitting a patch for this? As for what you can do right now: technically even if a fix for this was merged, you'd have to wait for the next release. However, the assignment is technically completely customizable, so in theory you could just copy/paste all the code from the patched assignor into a custom ConsumerPartitionAssignor implementation and then plug that in instead of the "cooperative-sticky" assignment strategy. Otherwise, the workaround you suggest is a reasonable backup – with the obvious downside being that the two threads will have an unbalanced load between them, at least the overall node-level workload will be more even > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Priority: Major > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two
[jira] [Created] (KAFKA-16280) Expose method to determine Metric Measurability
Apoorv Mittal created KAFKA-16280: - Summary: Expose method to determine Metric Measurability Key: KAFKA-16280 URL: https://issues.apache.org/jira/browse/KAFKA-16280 Project: Kafka Issue Type: Bug Components: metrics Affects Versions: 3.8.0 Reporter: Apoorv Mittal Assignee: Apoorv Mittal Fix For: 3.8.0 The Jira is to track the development of KIP-1019: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16279) Avoid leaking abstractions of `StateStore`
Matthias J. Sax created KAFKA-16279: --- Summary: Avoid leaking abstractions of `StateStore` Key: KAFKA-16279 URL: https://issues.apache.org/jira/browse/KAFKA-16279 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax The `StateStore` interface is user facing and contains a few life-cycle management methods (like `init()` and `close()`) – those methods are exposed for users to develop custom state stores. However, we also use `StateStore` as base interface for store-handles in the PAPI, and thus life-cycle management methods are leaking into the PAPI (maybe also others – would need a dedicated discussion which one we consider useful for PAPI users and which not). We should consider to change what we expose in the PAPI (atm, we only document via JavaDocs that eg. `close()` should never be called; but it's of course not ideal and would be better if `close()` et al. would not be expose for `PAPI` users to begin with.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lucasbru commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1494961008 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1344,8 +1343,8 @@ public void commitSync(Map offsets, Duration long commitStart = time.nanoseconds(); try { Timer requestTimer = time.timer(timeout.toMillis()); -// Commit with a timer to control how long the request should be retried until it -// gets a successful response or non-retriable error. +// Commit with a retry timeout (the commit request will be retried until it gets a +// successful response, non-retriable error, or the timeout expires) CompletableFuture commitFuture = commit(offsets, true, Optional.of(timeout.toMillis())); Review Comment: While you are at it, I think we can remove the "isWakeupable" parameter and just set the `wakeUpTrigger` in the calling context. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -149,8 +148,14 @@ private void process(final CommitApplicationEvent event) { } CommitRequestManager manager = requestManagers.commitRequestManager.get(); -Optional expirationTimeMs = event.retryTimeoutMs().map(this::getExpirationTimeForTimeout); -event.chain(manager.addOffsetCommitRequest(event.offsets(), expirationTimeMs, false)); +CompletableFuture commitResult; +if (event.retryTimeoutMs().isPresent()) { Review Comment: It's a bit weird that we use `retryTimeoutMs` to carry the information that this is a sync-commit vs asynccommit. How about going all-in here and just having `AsyncCommitApplicationEvent` and a `SyncCommitApplicationEvent`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ -private CompletableFuture maybeAutoCommit(final Map offsets, -final Optional expirationTimeMs, -boolean checkInterval, -boolean retryOnStaleEpoch) { -if (!autoCommitEnabled()) { -log.debug("Skipping auto-commit because auto-commit config is not enabled."); -return CompletableFuture.completedFuture(null); -} - +private CompletableFuture> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); -if (checkInterval && !autocommit.shouldAutoCommit()) { -return CompletableFuture.completedFuture(null); +CompletableFuture> result; +if (requestState.offsets.isEmpty()) { +result = CompletableFuture.completedFuture(Collections.emptyMap()); +} else { +autocommit.setInflightCommitStatus(true); +OffsetCommitRequestState request = pen
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
RamanVerma commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1494984871 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, def handleListTransactions( filteredProducerIds: Set[Long], -filteredStates: Set[String] +filteredStates: Set[String], +durationFilter: Long = -1 Review Comment: @yyu1993 this default value needs to be changed to -1L as well ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -61,6 +62,11 @@ public ListTransactionsOptions filterProducerIds(Collection producerIdFilt return this; } +public ListTransactionsOptions durationFilter(long durationMs) { Review Comment: Please add a comment to this method like we have for the other methods above. Also, we should probably change the method name to filterOnDuration, to match rest of the filter setting methods. ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -81,11 +87,16 @@ public Set filteredProducerIds() { return filteredProducerIds; } +public long getDurationFilter() { Review Comment: Please add a Java doc comment to the method. Also, change the method name to `filteredDuration` ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -81,11 +87,16 @@ public Set filteredProducerIds() { return filteredProducerIds; } +public long getDurationFilter() { +return durationFilter; +} + @Override public String toString() { return "ListTransactionsOptions(" + "filteredStates=" + filteredStates + ", filteredProducerIds=" + filteredProducerIds + +", durationFilter=" + durationFilter + Review Comment: nit: durationFilter -> filteredDuration ## tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java: ## @@ -187,14 +187,25 @@ private void testDescribeProducers( assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size(; } -@Test -public void testListTransactions() throws Exception { +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testListTransactions(boolean hasDurationFilter) throws Exception { String[] args = new String[] { "--bootstrap-server", "localhost:9092", "list" }; +if (hasDurationFilter) { +args = new String[] { +"--bootstrap-server", +"localhost:9092", +"list", +"--duration-filter", +Long.toString(Long.MAX_VALUE) Review Comment: hmm this will not return anything, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15467) Kafka broker returns offset out of range for topic/partitions on restart from unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818572#comment-17818572 ] Steve Jacobs commented on KAFKA-15467: -- This isn't a kafka or mm2 bug per se. This is all down to timing. Unclean shutdown forced no writes to disk on the single broker setup, so when the power came back, MM2 had consumed 'past the end of offsets on disk'. And since we were flushing to disk every minute, it only took 1 minute for offsets to 'catch up', making it appear like the offsets were on the broker when in fact they were not. [https://github.com/apache/kafka/pull/14567] Should let me work around this issue when it merges, hopefully in kafka 3.7.0 > Kafka broker returns offset out of range for topic/partitions on restart from > unclean shutdown > -- > > Key: KAFKA-15467 > URL: https://issues.apache.org/jira/browse/KAFKA-15467 > Project: Kafka > Issue Type: Bug > Components: core, log >Affects Versions: 3.5.1 > Environment: Apache Kafka 3.5.1 with Strimzi on kubernetes. >Reporter: Steve Jacobs >Priority: Major > > So this started with me thinking this was a mirrormaker2 issue because here > are the symptoms I am seeing: > I'm encountering an odd issue with mirrormaker2 with our remote replication > setup to high latency remote sites (satellite). > Every few days we get several topics completely re-replicated, this appears > to happen after a network connectivity outage. It doesn't matter if it's a > long outage (hours) or a short one (minutes). And it only seems to affect a > few topics. > I was finally able to track down some logs showing the issue. This was after > an hour-ish long outage where connectivity went down. There were lots of logs > about connection timeouts, etc. Here is the relevant part when the connection > came back up: > {code:java} > 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] > [AdminClient > clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > Disconnecting from node 0 due to socket connection setup timeout. The > timeout value is 63245 ms. (org.apache.kafka.clients.NetworkClient) > [kafka-admin-client-thread | > mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] > [AdminClient > clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > Metadata update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager) > [kafka-admin-client-thread | > mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Disconnecting from node 0 due to socket connection setup > timeout. The timeout value is 52624 ms. > (org.apache.kafka.clients.NetworkClient) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Error sending fetch request (sessionId=460667411, > epoch=INITIAL) to node 0: (org.apache.kafka.clients.FetchSessionHandler) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > 2023-09-08 16:52:47,336 INFO [scbi->gcp.MirrorSourceConnector|worker] > refreshing topics took 67359 ms (org.apache.kafka.connect.mirror.Scheduler) > [Scheduler for MirrorSourceConnector: > scbi->gcp|scbi->gcp.MirrorSourceConnector-refreshing topics] > 2023-09-08 16:52:48,413 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Fetch position FetchPosition{offset=4918131, > offsetEpoch=Optional[0], > currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094 > (id: 0 rack: null)], epoch=0}} is out of range for partition > reading.sensor.hfp01sc-0, resetting offset > (org.apache.kafka.clients.consumer.internals.AbstractFetch) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > (Repeats for 11 more topics) > 2023-09-08 16:52:48,479 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Resetting offset for partition reading.sensor.hfp01sc-0 to > position FetchPosition{offset=3444977, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094 >
[jira] [Comment Edited] (KAFKA-16278) Missing license for scala related dependencies
[ https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818562#comment-17818562 ] Divij Vaidya edited comment on KAFKA-16278 at 2/19/24 7:48 PM: --- Sure [~anton.liauchuk] . You should be able to assign his ticket to yourself now. P.S. - In future, feel free to assign any "unassigned" Jira ticket to yourself (by changing the Assignee) and start working on it. You don't have to ask for a permission. was (Author: divijvaidya): Sure [~anton.liauchuk] . You should be able to assign his ticket to yourself now. > Missing license for scala related dependencies > --- > > Key: KAFKA-16278 > URL: https://issues.apache.org/jira/browse/KAFKA-16278 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Divij Vaidya >Assignee: Anton Liauchuk >Priority: Blocker > Labels: newbie > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > We are missing the license for following dependency in > [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] > > scala-collection-compat_2.12-2.10.0 is missing in license file > scala-java8-compat_2.12-1.0.2 is missing in license file > scala-library-2.12.18 is missing in license file > scala-logging_2.12-3.9.4 is missing in license file > scala-reflect-2.12.18 is missing in license file > The objective of this task is to add these dependencies in the LICENSE-binary > file. > (please backport to 3.6 and 3.7 branches) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16278) Missing license for scala related dependencies
[ https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Liauchuk reassigned KAFKA-16278: -- Assignee: Anton Liauchuk > Missing license for scala related dependencies > --- > > Key: KAFKA-16278 > URL: https://issues.apache.org/jira/browse/KAFKA-16278 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Divij Vaidya >Assignee: Anton Liauchuk >Priority: Blocker > Labels: newbie > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > We are missing the license for following dependency in > [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] > > scala-collection-compat_2.12-2.10.0 is missing in license file > scala-java8-compat_2.12-1.0.2 is missing in license file > scala-library-2.12.18 is missing in license file > scala-logging_2.12-3.9.4 is missing in license file > scala-reflect-2.12.18 is missing in license file > The objective of this task is to add these dependencies in the LICENSE-binary > file. > (please backport to 3.6 and 3.7 branches) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16278) Missing license for scala related dependencies
[ https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818562#comment-17818562 ] Divij Vaidya commented on KAFKA-16278: -- Sure [~anton.liauchuk] . You should be able to assign his ticket to yourself now. > Missing license for scala related dependencies > --- > > Key: KAFKA-16278 > URL: https://issues.apache.org/jira/browse/KAFKA-16278 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Divij Vaidya >Priority: Blocker > Labels: newbie > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > We are missing the license for following dependency in > [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] > > scala-collection-compat_2.12-2.10.0 is missing in license file > scala-java8-compat_2.12-1.0.2 is missing in license file > scala-library-2.12.18 is missing in license file > scala-logging_2.12-3.9.4 is missing in license file > scala-reflect-2.12.18 is missing in license file > The objective of this task is to add these dependencies in the LICENSE-binary > file. > (please backport to 3.6 and 3.7 branches) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16278) Missing license for scala related dependencies
[ https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818560#comment-17818560 ] Anton Liauchuk commented on KAFKA-16278: Can I pick this up? > Missing license for scala related dependencies > --- > > Key: KAFKA-16278 > URL: https://issues.apache.org/jira/browse/KAFKA-16278 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Divij Vaidya >Priority: Blocker > Labels: newbie > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > We are missing the license for following dependency in > [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] > > scala-collection-compat_2.12-2.10.0 is missing in license file > scala-java8-compat_2.12-1.0.2 is missing in license file > scala-library-2.12.18 is missing in license file > scala-logging_2.12-3.9.4 is missing in license file > scala-reflect-2.12.18 is missing in license file > The objective of this task is to add these dependencies in the LICENSE-binary > file. > (please backport to 3.6 and 3.7 branches) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16278) Missing license for scala related dependencies
[ https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-16278: - Fix Version/s: 3.8.0 > Missing license for scala related dependencies > --- > > Key: KAFKA-16278 > URL: https://issues.apache.org/jira/browse/KAFKA-16278 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Divij Vaidya >Priority: Blocker > Labels: newbie > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > We are missing the license for following dependency in > [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] > > scala-collection-compat_2.12-2.10.0 is missing in license file > scala-java8-compat_2.12-1.0.2 is missing in license file > scala-library-2.12.18 is missing in license file > scala-logging_2.12-3.9.4 is missing in license file > scala-reflect-2.12.18 is missing in license file > The objective of this task is to add these dependencies in the LICENSE-binary > file. > (please backport to 3.6 and 3.7 branches) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16278) Missing license for scala related dependencies
[ https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-16278: - Fix Version/s: 3.6.2 3.7.1 > Missing license for scala related dependencies > --- > > Key: KAFKA-16278 > URL: https://issues.apache.org/jira/browse/KAFKA-16278 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Divij Vaidya >Priority: Blocker > Labels: newbie > Fix For: 3.6.2, 3.7.1 > > > We are missing the license for following dependency in > [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] > > scala-collection-compat_2.12-2.10.0 is missing in license file > scala-java8-compat_2.12-1.0.2 is missing in license file > scala-library-2.12.18 is missing in license file > scala-logging_2.12-3.9.4 is missing in license file > scala-reflect-2.12.18 is missing in license file > The objective of this task is to add these dependencies in the LICENSE-binary > file. > (please backport to 3.6 and 3.7 branches) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16278) Missing license for scala related dependencies
[ https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-16278: - Priority: Blocker (was: Major) > Missing license for scala related dependencies > --- > > Key: KAFKA-16278 > URL: https://issues.apache.org/jira/browse/KAFKA-16278 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Divij Vaidya >Priority: Blocker > Labels: newbie > > We are missing the license for following dependency in > [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] > > scala-collection-compat_2.12-2.10.0 is missing in license file > scala-java8-compat_2.12-1.0.2 is missing in license file > scala-library-2.12.18 is missing in license file > scala-logging_2.12-3.9.4 is missing in license file > scala-reflect-2.12.18 is missing in license file > The objective of this task is to add these dependencies in the LICENSE-binary > file. > (please backport to 3.6 and 3.7 branches) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16278) Missing license for scala related dependencies
[ https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-16278: - Description: We are missing the license for following dependency in [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] scala-collection-compat_2.12-2.10.0 is missing in license file scala-java8-compat_2.12-1.0.2 is missing in license file scala-library-2.12.18 is missing in license file scala-logging_2.12-3.9.4 is missing in license file scala-reflect-2.12.18 is missing in license file The objective of this task is to add these dependencies in the LICENSE-binary file. (please backport to 3.6 and 3.7 branches) was: We are missing the license for following dependency in [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] scala-collection-compat_2.12-2.10.0 is missing in license file scala-java8-compat_2.12-1.0.2 is missing in license file scala-library-2.12.18 is missing in license file scala-logging_2.12-3.9.4 is missing in license file scala-reflect-2.12.18 is missing in license file The objective of this task is to add these dependencies in the LICENSE-binary file. > Missing license for scala related dependencies > --- > > Key: KAFKA-16278 > URL: https://issues.apache.org/jira/browse/KAFKA-16278 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Divij Vaidya >Priority: Major > Labels: newbie > > We are missing the license for following dependency in > [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] > > scala-collection-compat_2.12-2.10.0 is missing in license file > scala-java8-compat_2.12-1.0.2 is missing in license file > scala-library-2.12.18 is missing in license file > scala-logging_2.12-3.9.4 is missing in license file > scala-reflect-2.12.18 is missing in license file > The objective of this task is to add these dependencies in the LICENSE-binary > file. > (please backport to 3.6 and 3.7 branches) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12622) Automate LICENSE file validation
[ https://issues.apache.org/jira/browse/KAFKA-12622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818559#comment-17818559 ] Divij Vaidya commented on KAFKA-12622: -- *Update for release managers* Please check for correct licenses in both binaries (kafka_2.13 and kafka2.12). > Automate LICENSE file validation > > > Key: KAFKA-12622 > URL: https://issues.apache.org/jira/browse/KAFKA-12622 > Project: Kafka > Issue Type: Task >Reporter: John Roesler >Priority: Major > Fix For: 3.8.0 > > > In https://issues.apache.org/jira/browse/KAFKA-12602, we manually constructed > a correct license file for 2.8.0. This file will certainly become wrong again > in later releases, so we need to write some kind of script to automate a > check. > It crossed my mind to automate the generation of the file, but it seems to be > an intractable problem, considering that each dependency may change licenses, > may package license files, link to them from their poms, link to them from > their repos, etc. I've also found multiple URLs listed with various > delimiters, broken links that I have to chase down, etc. > Therefore, it seems like the solution to aim for is simply: list all the jars > that we package, and print out a report of each jar that's extra or missing > vs. the ones in our `LICENSE-binary` file. > The check should be part of the release script at least, if not part of the > regular build (so we keep it up to date as dependencies change). > > Here's how I do this manually right now: > {code:java} > // build the binary artifacts > $ ./gradlewAll releaseTarGz > // unpack the binary artifact > $ tar xf core/build/distributions/kafka_2.13-X.Y.Z.tgz > $ cd xf kafka_2.13-X.Y.Z > // list the packaged jars > // (you can ignore the jars for our own modules, like kafka, kafka-clients, > etc.) > $ ls libs/ > // cross check the jars with the packaged LICENSE > // make sure all dependencies are listed with the right versions > $ cat LICENSE > // also double check all the mentioned license files are present > $ ls licenses {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16278) Missing license for scala related dependencies
Divij Vaidya created KAFKA-16278: Summary: Missing license for scala related dependencies Key: KAFKA-16278 URL: https://issues.apache.org/jira/browse/KAFKA-16278 Project: Kafka Issue Type: Bug Affects Versions: 3.6.1, 3.7.0 Reporter: Divij Vaidya We are missing the license for following dependency in [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261] scala-collection-compat_2.12-2.10.0 is missing in license file scala-java8-compat_2.12-1.0.2 is missing in license file scala-library-2.12.18 is missing in license file scala-logging_2.12-3.9.4 is missing in license file scala-reflect-2.12.18 is missing in license file The objective of this task is to add these dependencies in the LICENSE-binary file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]
hgeraldino commented on PR #15316: URL: https://github.com/apache/kafka/pull/15316#issuecomment-1952966858 Thanks @gharris1727 for your review. I think I addressed most of your comments. If anyone has any other suggestions, please let me know. Ideally we should rename this file back to `WorkerSinkTaskTest` (not sure if as part of this PR or a separate one) and have checkstyle, build.gradle, etc. cleaned up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]
hgeraldino commented on code in PR #15316: URL: https://github.com/apache/kafka/pull/15316#discussion_r1494896646 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ## @@ -601,6 +690,567 @@ public void testPartialRevocationAndAssignment() { verify(sinkTask, times(4)).put(Collections.emptyList()); } +@Test +@SuppressWarnings("unchecked") +public void testPreCommitFailureAfterPartialRevocationAndAssignment() { +createTask(initialState); +expectTaskGetTopic(); + +workerTask.initialize(TASK_CONFIG); +workerTask.initializeAndStart(); +verifyInitializeTask(); + +when(consumer.assignment()) +.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))); + +INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); +when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET); + +// First poll; assignment is [TP1, TP2] +when(consumer.poll(any(Duration.class))) +.thenAnswer((Answer>) invocation -> { + rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); +return ConsumerRecords.empty(); +}) +// Second poll; a single record is delivered from TP1 +.thenAnswer(expectConsumerPoll(1)) +// Third poll; assignment changes to [TP2] +.thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION)); + rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet()); +return ConsumerRecords.empty(); +}) +// Fourth poll; assignment changes to [TP2, TP3] +.thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet()); + rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3)); +return ConsumerRecords.empty(); +}) +// Fifth poll; an offset commit takes place +.thenAnswer(expectConsumerPoll(0)); + +expectConversionAndTransformation(null, new RecordHeaders()); + +// First iteration--first call to poll, first consumer assignment +workerTask.iteration(); +// Second iteration--second call to poll, delivery of one record +workerTask.iteration(); +// Third iteration--third call to poll, partial consumer revocation +final Map offsets = new HashMap<>(); +offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); +when(sinkTask.preCommit(offsets)).thenReturn(offsets); +doAnswer(invocation -> null).when(consumer).commitSync(offsets); + +workerTask.iteration(); +verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION)); +verify(sinkTask, times(2)).put(Collections.emptyList()); + +// Fourth iteration--fourth call to poll, partial consumer assignment +workerTask.iteration(); + +verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3)); + +final Map workerCurrentOffsets = new HashMap<>(); +workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); +workerCurrentOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET)); +when(sinkTask.preCommit(workerCurrentOffsets)).thenThrow(new ConnectException("Failed to flush")); + +// Fifth iteration--task-requested offset commit with failure in SinkTask::preCommit +sinkTaskContext.getValue().requestCommit(); +workerTask.iteration(); + +verify(consumer).seek(TOPIC_PARTITION2, FIRST_OFFSET); +verify(consumer).seek(TOPIC_PARTITION3, FIRST_OFFSET); +} + +@Test +public void testWakeupInCommitSyncCausesRetry() { +createTask(initialState); + +workerTask.initialize(TASK_CONFIG); +time.sleep(3L); +workerTask.initializeAndStart(); +time.sleep(3L); +verifyInitializeTask(); + +expectTaskGetTopic(); +expectPollInitialAssignment() +.thenAnswer(expectConsumerPoll(1)) +.thenAnswer(invocation -> { + rebalanceListener
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]
hgeraldino commented on code in PR #15316: URL: https://github.com/apache/kafka/pull/15316#discussion_r1494861815 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ## @@ -447,6 +457,85 @@ public void testPollRedelivery() { assertSinkMetricValue("offset-commit-completion-total", 1.0); } +@Test +@SuppressWarnings("unchecked") +public void testPollRedeliveryWithConsumerRebalance() { +createTask(initialState); +expectTaskGetTopic(); + +workerTask.initialize(TASK_CONFIG); +workerTask.initializeAndStart(); +verifyInitializeTask(); + +Set newAssignment = new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)); + +when(consumer.assignment()) +.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT) +.thenReturn(newAssignment, newAssignment, newAssignment) +.thenReturn(Collections.singleton(TOPIC_PARTITION3), +Collections.singleton(TOPIC_PARTITION3), +Collections.singleton(TOPIC_PARTITION3)); + +INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); +when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET); + +when(consumer.poll(any(Duration.class))) +.thenAnswer((Answer>) invocation -> { + rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); +return ConsumerRecords.empty(); +}) +.thenAnswer(expectConsumerPoll(1)) +// Empty consumer poll (all partitions are paused) with rebalance; one new partition is assigned +.thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet()); + rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3)); +return ConsumerRecords.empty(); +}) +.thenAnswer(expectConsumerPoll(0)) +// Non-empty consumer poll; all initially-assigned partitions are revoked in rebalance, and new partitions are allowed to resume +.thenAnswer(invocation -> { +ConsumerRecord newRecord = new ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET, RAW_KEY, RAW_VALUE); + + rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT); + rebalanceListener.getValue().onPartitionsAssigned(Collections.emptyList()); +return new ConsumerRecords<>(Collections.singletonMap(TOPIC_PARTITION3, Collections.singletonList(newRecord))); Review Comment: You're right, the test was a bit off mainly because the first call to `task.put(..)` shouldn't have thrown an exception. Here's the sequence now (which matches the original WorkerSinkTaskTest): 1. Iteration#1: partitions on INITIAL_ASSIGNMENT are assigned 2. Iteration#2: `task.put(...)` throws, partitions are paused 3. Iteration#3: P3 is assigned, `task.put(...)` throws (task is already paused so it's a noop) 4. Iteration#4: `task.put(...)` throws, noop 5. Iteration#5: initial assignment is revoked (pending messages for P1 are removed); `consumer.poll(...)` returns a record for P3, which is successfully processed. I've added the corresponding verifications after each step. Hope it makes sense now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]
lucasbru commented on code in PR #15372: URL: https://github.com/apache/kafka/pull/15372#discussion_r1494847054 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest { ensureNoRebalance(consumer, listener) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, groupProtocol: String): Unit = { + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString) + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) + +val consumer = createConsumer() +val listener = new TestConsumerReassignmentListener +consumer.subscribe(List(topic).asJava, listener) + +// rebalance to get the initial assignment +awaitRebalance(consumer, listener) + +val initialAssignedCalls = listener.callsToAssigned + +consumer.poll(Duration.ofMillis(2000)) + +// Give enough time to rejoin +consumer.poll(Duration.ofMillis(500)) +consumer.poll(Duration.ofMillis(500)) + +// Check that we did not rejoin Review Comment: Done ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest { ensureNoRebalance(consumer, listener) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, groupProtocol: String): Unit = { + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString) + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) + +val consumer = createConsumer() +val listener = new TestConsumerReassignmentListener +consumer.subscribe(List(topic).asJava, listener) + +// rebalance to get the initial assignment +awaitRebalance(consumer, listener) + +val initialAssignedCalls = listener.callsToAssigned + +consumer.poll(Duration.ofMillis(2000)) + +// Give enough time to rejoin Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818535#comment-17818535 ] Henrique Mota commented on KAFKA-15841: --- That conditional basically prevented us from achieving what we wanted: !image-2024-02-19-13-48-55-875.png! The modification below would help us achieve our goal: for (Map taskProps : taskConfigs) { // Ensure we don't modify the connector's copy of the config Map taskConfig = new HashMap<>(taskProps); taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); if (connOriginals.containsKey(SinkTask.TOPICS_CONFIG) {*}{color:#de350b}&& !taskConfig.containsKey(SinkTask.TOPICS_CONFIG){color}{*}) { taskConfig.put(SinkTask.TOPICS_CONFIG, connOriginals.get(SinkTask.TOPICS_CONFIG)); } if (connOriginals.containsKey(SinkTask.TOPICS_REGEX_CONFIG) {*}{color:#de350b}&& !connOriginals.containsKey(SinkTask.taskConfig){color}{*}) { taskConfig.put(SinkTask.TOPICS_REGEX_CONFIG, connOriginals.get(SinkTask.TOPICS_REGEX_CONFIG)); } result.add(taskConfig); } > Add Support for Topic-Level Partitioning in Kafka Connect > - > > Key: KAFKA-15841 > URL: https://issues.apache.org/jira/browse/KAFKA-15841 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Henrique Mota >Priority: Trivial > Attachments: image-2024-02-19-13-48-55-875.png > > > In our organization, we utilize JDBC sink connectors to consume data from > various topics, where each topic is dedicated to a specific tenant with a > single partition. Recently, we developed a custom sink based on the standard > JDBC sink, enabling us to pause consumption of a topic when encountering > problematic records. > However, we face limitations within Kafka Connect, as it doesn't allow for > appropriate partitioning of topics among workers. We attempted a workaround > by breaking down the topics list within the 'topics' parameter. > Unfortunately, Kafka Connect overrides this parameter after invoking the > {{taskConfigs(int maxTasks)}} method from the > {{org.apache.kafka.connect.connector.Connector}} class. > We request the addition of support in Kafka Connect to enable the > partitioning of topics among workers without requiring a fork. This > enhancement would facilitate better load distribution and allow for more > flexible configurations, particularly in scenarios where topics are dedicated > to different tenants. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henrique Mota updated KAFKA-15841: -- Attachment: image-2024-02-19-13-48-55-875.png > Add Support for Topic-Level Partitioning in Kafka Connect > - > > Key: KAFKA-15841 > URL: https://issues.apache.org/jira/browse/KAFKA-15841 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Henrique Mota >Priority: Trivial > Attachments: image-2024-02-19-13-48-55-875.png > > > In our organization, we utilize JDBC sink connectors to consume data from > various topics, where each topic is dedicated to a specific tenant with a > single partition. Recently, we developed a custom sink based on the standard > JDBC sink, enabling us to pause consumption of a topic when encountering > problematic records. > However, we face limitations within Kafka Connect, as it doesn't allow for > appropriate partitioning of topics among workers. We attempted a workaround > by breaking down the topics list within the 'topics' parameter. > Unfortunately, Kafka Connect overrides this parameter after invoking the > {{taskConfigs(int maxTasks)}} method from the > {{org.apache.kafka.connect.connector.Connector}} class. > We request the addition of support in Kafka Connect to enable the > partitioning of topics among workers without requiring a fork. This > enhancement would facilitate better load distribution and allow for more > flexible configurations, particularly in scenarios where topics are dedicated > to different tenants. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]
hgeraldino commented on code in PR #15316: URL: https://github.com/apache/kafka/pull/15316#discussion_r1494825277 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ## @@ -601,6 +690,567 @@ public void testPartialRevocationAndAssignment() { verify(sinkTask, times(4)).put(Collections.emptyList()); } +@Test +@SuppressWarnings("unchecked") +public void testPreCommitFailureAfterPartialRevocationAndAssignment() { +createTask(initialState); +expectTaskGetTopic(); + +workerTask.initialize(TASK_CONFIG); +workerTask.initializeAndStart(); +verifyInitializeTask(); + +when(consumer.assignment()) +.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))); + +INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); +when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET); + +// First poll; assignment is [TP1, TP2] +when(consumer.poll(any(Duration.class))) +.thenAnswer((Answer>) invocation -> { + rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); +return ConsumerRecords.empty(); +}) +// Second poll; a single record is delivered from TP1 +.thenAnswer(expectConsumerPoll(1)) +// Third poll; assignment changes to [TP2] +.thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION)); + rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet()); +return ConsumerRecords.empty(); +}) +// Fourth poll; assignment changes to [TP2, TP3] +.thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet()); + rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3)); +return ConsumerRecords.empty(); +}) +// Fifth poll; an offset commit takes place +.thenAnswer(expectConsumerPoll(0)); + +expectConversionAndTransformation(null, new RecordHeaders()); + +// First iteration--first call to poll, first consumer assignment +workerTask.iteration(); +// Second iteration--second call to poll, delivery of one record +workerTask.iteration(); +// Third iteration--third call to poll, partial consumer revocation +final Map offsets = new HashMap<>(); +offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); +when(sinkTask.preCommit(offsets)).thenReturn(offsets); +doAnswer(invocation -> null).when(consumer).commitSync(offsets); + +workerTask.iteration(); +verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION)); +verify(sinkTask, times(2)).put(Collections.emptyList()); + +// Fourth iteration--fourth call to poll, partial consumer assignment +workerTask.iteration(); + +verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3)); + +final Map workerCurrentOffsets = new HashMap<>(); +workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); +workerCurrentOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET)); +when(sinkTask.preCommit(workerCurrentOffsets)).thenThrow(new ConnectException("Failed to flush")); + +// Fifth iteration--task-requested offset commit with failure in SinkTask::preCommit +sinkTaskContext.getValue().requestCommit(); +workerTask.iteration(); + +verify(consumer).seek(TOPIC_PARTITION2, FIRST_OFFSET); +verify(consumer).seek(TOPIC_PARTITION3, FIRST_OFFSET); +} + +@Test +public void testWakeupInCommitSyncCausesRetry() { +createTask(initialState); + +workerTask.initialize(TASK_CONFIG); +time.sleep(3L); +workerTask.initializeAndStart(); +time.sleep(3L); +verifyInitializeTask(); + +expectTaskGetTopic(); +expectPollInitialAssignment() +.thenAnswer(expectConsumerPoll(1)) +.thenAnswer(invocation -> { + rebalanceListener
[jira] [Comment Edited] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818530#comment-17818530 ] Henrique Mota edited comment on KAFKA-15841 at 2/19/24 4:41 PM: Hello [~gharris1727]! My use case is as follows: 1: I have multiple clients in each environment, with the largest having 90 clients (databases). 2: Each client has a database in one application, and we replicate approximately 100 tables from this database to another application's database, with this other database being multi-tenant. 3: Previously, we had one topic per table, with some partitions for each topic. So, we needed to ensure that if any client had inconsistent data, we would pause the consumption for that client and continue processing data for other clients. Thus, we separated a topic with a partition for each table and client. We then created an extension of the JDBC Sink that can pause a problematic topic, and after some time attempt to resume consumption of the paused topic (we decided to use one topic per client instead of one partition per client to facilitate identification). 4: We have a JDBC Sink for each table. 5: We noticed that if we add more than one worker, in this scenario, all topics were assigned to worker 0, and the others were left waiting. 6: We tried to change the 'topics' property in the configurations using the 'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property when it is returned by 'taskConfigs(int maxTasks)'. was (Author: henriquemota): Hello [~gharris1727]! My use case is as follows: 1: I have multiple clients in each environment, with the largest having 90 clients (databases). 2: Each client has a database in one application, and we replicate approximately 100 tables from this database to another application's database, with this other database being multi-tenant. 3: Previously, we had one topic per table, with some partitions for each topic. So, we needed to ensure that if any client had inconsistent data, we would pause the consumption for that client and continue processing data for other clients. Thus, we separated a topic with a partition for each table and client. We then created an extension of the JDBC Sink that can pause a problematic topic, and after some time attempt to resume consumption of the paused topic (we decided to use one topic per client instead of one partition per client to facilitate identification). 4: We have a JDBC Sink for each table. 5: We noticed that if we add more than one worker, in this scenario, all topics were assigned to worker 0, and the others were left waiting. 6: We tried to change the 'topics' property in the configurations using the 'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property when it is returned by 'taskConfigs(int maxTasks)'. > Add Support for Topic-Level Partitioning in Kafka Connect > - > > Key: KAFKA-15841 > URL: https://issues.apache.org/jira/browse/KAFKA-15841 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Henrique Mota >Priority: Trivial > > In our organization, we utilize JDBC sink connectors to consume data from > various topics, where each topic is dedicated to a specific tenant with a > single partition. Recently, we developed a custom sink based on the standard > JDBC sink, enabling us to pause consumption of a topic when encountering > problematic records. > However, we face limitations within Kafka Connect, as it doesn't allow for > appropriate partitioning of topics among workers. We attempted a workaround > by breaking down the topics list within the 'topics' parameter. > Unfortunately, Kafka Connect overrides this parameter after invoking the > {{taskConfigs(int maxTasks)}} method from the > {{org.apache.kafka.connect.connector.Connector}} class. > We request the addition of support in Kafka Connect to enable the > partitioning of topics among workers without requiring a fork. This > enhancement would facilitate better load distribution and allow for more > flexible configurations, particularly in scenarios where topics are dedicated > to different tenants. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818530#comment-17818530 ] Henrique Mota commented on KAFKA-15841: --- Hello [~gharris1727]! My use case is as follows: 1: I have multiple clients in each environment, with the largest having 90 clients (databases). 2: Each client has a database in one application, and we replicate approximately 100 tables from this database to another application's database, with this other database being multi-tenant. 3: Previously, we had one topic per table, with some partitions for each topic. So, we needed to ensure that if any client had inconsistent data, we would pause the consumption for that client and continue processing data for other clients. Thus, we separated a topic with a partition for each table and client. We then created an extension of the JDBC Sink that can pause a problematic topic, and after some time attempt to resume consumption of the paused topic (we decided to use one topic per client instead of one partition per client to facilitate identification). 4: We have a JDBC Sink for each table. 5: We noticed that if we add more than one worker, in this scenario, all topics were assigned to worker 0, and the others were left waiting. 6: We tried to change the 'topics' property in the configurations using the 'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property when it is returned by 'taskConfigs(int maxTasks)'. > Add Support for Topic-Level Partitioning in Kafka Connect > - > > Key: KAFKA-15841 > URL: https://issues.apache.org/jira/browse/KAFKA-15841 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Henrique Mota >Priority: Trivial > > In our organization, we utilize JDBC sink connectors to consume data from > various topics, where each topic is dedicated to a specific tenant with a > single partition. Recently, we developed a custom sink based on the standard > JDBC sink, enabling us to pause consumption of a topic when encountering > problematic records. > However, we face limitations within Kafka Connect, as it doesn't allow for > appropriate partitioning of topics among workers. We attempted a workaround > by breaking down the topics list within the 'topics' parameter. > Unfortunately, Kafka Connect overrides this parameter after invoking the > {{taskConfigs(int maxTasks)}} method from the > {{org.apache.kafka.connect.connector.Connector}} class. > We request the addition of support in Kafka Connect to enable the > partitioning of topics among workers without requiring a fork. This > enhancement would facilitate better load distribution and allow for more > flexible configurations, particularly in scenarios where topics are dedicated > to different tenants. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]
hgeraldino commented on code in PR #15316: URL: https://github.com/apache/kafka/pull/15316#discussion_r1494816048 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ## @@ -601,6 +690,567 @@ public void testPartialRevocationAndAssignment() { verify(sinkTask, times(4)).put(Collections.emptyList()); } +@Test +@SuppressWarnings("unchecked") +public void testPreCommitFailureAfterPartialRevocationAndAssignment() { +createTask(initialState); +expectTaskGetTopic(); + +workerTask.initialize(TASK_CONFIG); +workerTask.initializeAndStart(); +verifyInitializeTask(); + +when(consumer.assignment()) +.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))); + +INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); +when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET); + +// First poll; assignment is [TP1, TP2] +when(consumer.poll(any(Duration.class))) +.thenAnswer((Answer>) invocation -> { + rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); +return ConsumerRecords.empty(); +}) +// Second poll; a single record is delivered from TP1 +.thenAnswer(expectConsumerPoll(1)) +// Third poll; assignment changes to [TP2] +.thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION)); + rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet()); +return ConsumerRecords.empty(); +}) +// Fourth poll; assignment changes to [TP2, TP3] +.thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet()); + rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3)); +return ConsumerRecords.empty(); +}) +// Fifth poll; an offset commit takes place +.thenAnswer(expectConsumerPoll(0)); + +expectConversionAndTransformation(null, new RecordHeaders()); + +// First iteration--first call to poll, first consumer assignment +workerTask.iteration(); +// Second iteration--second call to poll, delivery of one record +workerTask.iteration(); +// Third iteration--third call to poll, partial consumer revocation +final Map offsets = new HashMap<>(); +offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); +when(sinkTask.preCommit(offsets)).thenReturn(offsets); +doAnswer(invocation -> null).when(consumer).commitSync(offsets); + +workerTask.iteration(); +verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION)); +verify(sinkTask, times(2)).put(Collections.emptyList()); + +// Fourth iteration--fourth call to poll, partial consumer assignment +workerTask.iteration(); + +verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3)); + +final Map workerCurrentOffsets = new HashMap<>(); +workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); +workerCurrentOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET)); +when(sinkTask.preCommit(workerCurrentOffsets)).thenThrow(new ConnectException("Failed to flush")); + +// Fifth iteration--task-requested offset commit with failure in SinkTask::preCommit +sinkTaskContext.getValue().requestCommit(); +workerTask.iteration(); + +verify(consumer).seek(TOPIC_PARTITION2, FIRST_OFFSET); +verify(consumer).seek(TOPIC_PARTITION3, FIRST_OFFSET); +} + +@Test +public void testWakeupInCommitSyncCausesRetry() { +createTask(initialState); + +workerTask.initialize(TASK_CONFIG); +time.sleep(3L); +workerTask.initializeAndStart(); +time.sleep(3L); +verifyInitializeTask(); + +expectTaskGetTopic(); +expectPollInitialAssignment() +.thenAnswer(expectConsumerPoll(1)) +.thenAnswer(invocation -> { + rebalanceListener
Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]
hgeraldino commented on code in PR #15316: URL: https://github.com/apache/kafka/pull/15316#discussion_r1494806542 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ## @@ -601,6 +690,567 @@ public void testPartialRevocationAndAssignment() { verify(sinkTask, times(4)).put(Collections.emptyList()); } +@Test +@SuppressWarnings("unchecked") +public void testPreCommitFailureAfterPartialRevocationAndAssignment() { +createTask(initialState); +expectTaskGetTopic(); + +workerTask.initialize(TASK_CONFIG); +workerTask.initializeAndStart(); +verifyInitializeTask(); + +when(consumer.assignment()) +.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))) +.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))); + +INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); +when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET); + +// First poll; assignment is [TP1, TP2] +when(consumer.poll(any(Duration.class))) +.thenAnswer((Answer>) invocation -> { + rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); +return ConsumerRecords.empty(); +}) +// Second poll; a single record is delivered from TP1 +.thenAnswer(expectConsumerPoll(1)) +// Third poll; assignment changes to [TP2] +.thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION)); + rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet()); +return ConsumerRecords.empty(); +}) +// Fourth poll; assignment changes to [TP2, TP3] +.thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet()); + rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3)); +return ConsumerRecords.empty(); +}) +// Fifth poll; an offset commit takes place +.thenAnswer(expectConsumerPoll(0)); + +expectConversionAndTransformation(null, new RecordHeaders()); + +// First iteration--first call to poll, first consumer assignment +workerTask.iteration(); +// Second iteration--second call to poll, delivery of one record +workerTask.iteration(); +// Third iteration--third call to poll, partial consumer revocation +final Map offsets = new HashMap<>(); +offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); +when(sinkTask.preCommit(offsets)).thenReturn(offsets); +doAnswer(invocation -> null).when(consumer).commitSync(offsets); Review Comment: good catch... updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16266) Introduce TransactionLastUpdateTimeMs tagged field to DescribeTransactionsResponse
[ https://issues.apache.org/jira/browse/KAFKA-16266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Sobeh reassigned KAFKA-16266: --- Assignee: Ahmed Sobeh > Introduce TransactionLastUpdateTimeMs tagged field to > DescribeTransactionsResponse > --- > > Key: KAFKA-16266 > URL: https://issues.apache.org/jira/browse/KAFKA-16266 > Project: Kafka > Issue Type: Sub-task >Reporter: Yang Yu >Assignee: Ahmed Sobeh >Priority: Major > > Introduce TransactionLastUpdateTimeMs tagged field to > DescribeTransactionsResponse. Make broker side changes to send this bit of > information. Also, make changes to `kafka-transactions.sh --describe` tooling > to display this new piece of information to the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Clean up core metrics, migration, network, raft, security, serializer, tools, utils, and zookeeper modules [kafka]
jlprat merged PR #15279: URL: https://github.com/apache/kafka/pull/15279 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up core metrics, migration, network, raft, security, serializer, tools, utils, and zookeeper modules [kafka]
jlprat commented on PR #15279: URL: https://github.com/apache/kafka/pull/15279#issuecomment-1952754579 Checked that all tests pass locally and are flaky tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16251: Fix for not sending heartbeat while fenced [kafka]
lianetm commented on PR #15392: URL: https://github.com/apache/kafka/pull/15392#issuecomment-1952684051 Also @mjsax this is a very small one, closely related to the protocol, might be interesting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16251: Fix for not sending heartbeat while fenced [kafka]
lianetm commented on PR #15392: URL: https://github.com/apache/kafka/pull/15392#issuecomment-1952680928 Hey @lucasbru, could you take a look to this small fix? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15349 ducker-ak should fail fast when gradlew systemTestLibs fails [kafka]
ahmedsobeh commented on PR #15390: URL: https://github.com/apache/kafka/pull/15390#issuecomment-1952625573 closing as the branch was incorrectly created -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15349 ducker-ak should fail fast when gradlew systemTestLibs fails [kafka]
ahmedsobeh opened a new pull request, #15391: URL: https://github.com/apache/kafka/pull/15391 In this modification, if ./gradlew systemTestLibs fails, the script will output an error message and terminate execution using the die function. This ensures that the script fails fast and prompts the user to address the error before continuing. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15349 ducker-ak should fail fast when gradlew systemTestLibs fails [kafka]
ahmedsobeh closed pull request #15390: KAFKA-15349 ducker-ak should fail fast when gradlew systemTestLibs fails URL: https://github.com/apache/kafka/pull/15390 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15349 ducker-ak should fail fast when gradlew systemTestLibs fails [kafka]
ahmedsobeh opened a new pull request, #15390: URL: https://github.com/apache/kafka/pull/15390 In this modification, if ./gradlew systemTestLibs fails, the script will output an error message and terminate execution using the die function. This ensures that the script fails fast and prompts the user to address the error before continuing. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]
cadonna commented on code in PR #15372: URL: https://github.com/apache/kafka/pull/15372#discussion_r1494650399 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest { ensureNoRebalance(consumer, listener) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, groupProtocol: String): Unit = { + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString) + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) + +val consumer = createConsumer() +val listener = new TestConsumerReassignmentListener +consumer.subscribe(List(topic).asJava, listener) + +// rebalance to get the initial assignment +awaitRebalance(consumer, listener) + +val initialAssignedCalls = listener.callsToAssigned + +consumer.poll(Duration.ofMillis(2000)) + +// Give enough time to rejoin +consumer.poll(Duration.ofMillis(500)) +consumer.poll(Duration.ofMillis(500)) + +// Check that we did not rejoin Review Comment: Do we need this comment? I think it would be better to delete it and to rename `initialAssignedCalls` to something more meaningful like `callsToAssignedAfterFirstRebalance` or `callsToAssignedBeforePolls`. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest { ensureNoRebalance(consumer, listener) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, groupProtocol: String): Unit = { + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString) + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) + +val consumer = createConsumer() +val listener = new TestConsumerReassignmentListener +consumer.subscribe(List(topic).asJava, listener) + +// rebalance to get the initial assignment +awaitRebalance(consumer, listener) + +val initialAssignedCalls = listener.callsToAssigned + +consumer.poll(Duration.ofMillis(2000)) + +// Give enough time to rejoin Review Comment: nit: This comment is a bit confusing. What is it supposed to clarify? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]
lucasbru merged PR #15383: URL: https://github.com/apache/kafka/pull/15383 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15349) ducker-ak should fail fast when gradlew systemTestLibs fails
[ https://issues.apache.org/jira/browse/KAFKA-15349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Sobeh reassigned KAFKA-15349: --- Assignee: Ahmed Sobeh > ducker-ak should fail fast when gradlew systemTestLibs fails > > > Key: KAFKA-15349 > URL: https://issues.apache.org/jira/browse/KAFKA-15349 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Greg Harris >Assignee: Ahmed Sobeh >Priority: Minor > Labels: newbie++ > > If you introduce a flaw into the gradle build which causes the systemTestLibs > to fail, such as a circular dependency, then the ducker_test function > continues to run tests which are invalid. > Rather than proceeding to run the tests, the script should fail fast and make > the user address the error before continuing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription
[ https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16261: --- Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor) > MembershipManagerImpl.updateSubscription fails if already empty subscription > > > Key: KAFKA-16261 > URL: https://issues.apache.org/jira/browse/KAFKA-16261 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Andrew Schofield >Assignee: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > > The internal SubscriptionState object keeps track of whether the assignment > is user-assigned, or auto-assigned. If there are no assigned partitions, the > assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed > in this state it fails. > The easiest thing is perhaps to check > SubscriptionState.hasAutoAssignedPartitions() to make sure that > assignFromSubscribed is going to be permitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
[ https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16258: --- Labels: client-transitions-issues kip-848-client-support (was: kip-848-client-support) > Stale member should trigger onPartitionsLost when leaving group > --- > > Key: KAFKA-16258 > URL: https://issues.apache.org/jira/browse/KAFKA-16258 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > When the poll timer expires, the new consumer proactively leaves the group > and clears its assignments, but it should also invoke the onPartitionsLost > callback. The legacy coordinator does the following sequence on poll timer > expiration: send leave group request > ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), > invoke onPartitionsLost, and when it completes it clears the assignment > (onJoinPrepare > [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). > This issue is most probably what is causing the failures in the integration > tests that fail expecting callbacks when the poll interval expires (like > https://issues.apache.org/jira/browse/KAFKA-16008) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
[ https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-16258: -- Assignee: Lianet Magrans > Stale member should trigger onPartitionsLost when leaving group > --- > > Key: KAFKA-16258 > URL: https://issues.apache.org/jira/browse/KAFKA-16258 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > When the poll timer expires, the new consumer proactively leaves the group > and clears its assignments, but it should also invoke the onPartitionsLost > callback. The legacy coordinator does the following sequence on poll timer > expiration: send leave group request > ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), > invoke onPartitionsLost, and when it completes it clears the assignment > (onJoinPrepare > [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). > This issue is most probably what is causing the failures in the integration > tests that fail expecting callbacks when the poll interval expires (like > https://issues.apache.org/jira/browse/KAFKA-16008) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reopened KAFKA-16167: > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818487#comment-17818487 ] Lucas Brutschy edited comment on KAFKA-16167 at 2/19/24 2:01 PM: - Looks like the test is still flaky. IN an unrelated PR I got this: [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15219/10/tests] ``` java.lang.NullPointerException: Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because the return value of "java.util.Map.get(Object)" is null at kafka.api.PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup(PlaintextConsumerTest.scala:316) ``` was (Author: JIRAUSER302322): Looks like the test is still flaky. IN an unrelated PR I got this: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15219/10/tests > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818487#comment-17818487 ] Lucas Brutschy commented on KAFKA-16167: Looks like the test is still flaky. IN an unrelated PR I got this: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15219/10/tests > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1952492560 > > > @cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think? > > > > > > IMO, we must support the deprecated pattern subscriptions with `java.util.regex.Pattern` to ensure backwards compatibility, but we do not need to support mixed usage of `java.util.regex.Pattern` and Google regex patterns. I think this is a blind spot in the KIP. I propose to throw an `IllegalStateException` if `subscribe(java.util.regex.Pattern)` is called after `subscribe(SubscriptionPattern)` (and vice versa) without calling `unsubscribe()` in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types. Does this make sense to you? \cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear? > > @cadonna I would rather follow what we already do with `subscribe` today. The last one called takes precedence. I have a question. The subscribe method that use Pattern override the subscription with topic(s) that match the Pattern. When user choose to use SubscriptionPattern, but already used Pattern beforehand, should we clear out the old subscription? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16084) Simplify and deduplicate StandaloneHerderTest mocking
[ https://issues.apache.org/jira/browse/KAFKA-16084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818185#comment-17818185 ] Ahmed Sobeh edited comment on KAFKA-16084 at 2/19/24 12:53 PM: --- -Hi- [~gharris1727]{-}! I'm almost done with this but I have one question, any specific recommendations for expectConfigValidation?{-} had an idea, so submitted a PR was (Author: JIRAUSER295920): Hi [~gharris1727]! I'm almost done with this but I have one question, any specific recommendations for expectConfigValidation? > Simplify and deduplicate StandaloneHerderTest mocking > - > > Key: KAFKA-16084 > URL: https://issues.apache.org/jira/browse/KAFKA-16084 > Project: Kafka > Issue Type: Test > Components: connect >Reporter: Greg Harris >Assignee: Ahmed Sobeh >Priority: Minor > Labels: newbie++ > > The StandaloneHerderTest has some cruft that can be cleaned up. What i've > found: > * The `connector` field is written in nearly every test, but only read by one > test, and looks to be nearly irrelevant. > * `expectConfigValidation` has two ways of specifying consecutive > validations. 1. The boolean shouldCreateConnector which is true in the first > invocation and false in subsequent invocations. 2. by passing multiple > configurations via varargs. > * The class uses a mix of Mock annotations and mock(Class) invocations > * The test doesn't stop the thread pool created inside the herder and might > leak threads > * Mocking for Worker#startConnector is 6 lines which are duplicated 8 times > throughout the test > * Some waits are 1000 ms and others are 1000 s, and could be pulled out to > constants or a util method -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Kafka 16084 Simplify and deduplicate standalone herder test mocking [kafka]
ahmedsobeh opened a new pull request, #15389: URL: https://github.com/apache/kafka/pull/15389 - Removed the connector field. - The class had a mix of Mock annotations and mock(Class) invocations, cleaned up one of them - The test doesn't stop the thread pool created inside the herder and might leak threads, added herder stopping in tearDown - Mocking for Worker#startConnector is 6 lines which are duplicated 8 times throughout the test, extracted to a method - Some waits are 1000 ms and others are 1000 s, and could be pulled out to constants or a util method. unified and set to a constant *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16259 Immutable MetadataCache to improve client performance [kafka]
msn-tldr commented on PR #15376: URL: https://github.com/apache/kafka/pull/15376#issuecomment-1952339142 @ericzhifengchen It seems you had a similar idea on creating immutable metadata cache on the client to improve latency :) I have created a follow-up https://github.com/apache/kafka/pull/15385 to add similar test to `testConcurrentUpdateAndGetCluster` in this PR. I can add you as a co-author to PR 15385, can you share the email with your github account? See steps on getting this information [here](https://docs.github.com/en/pull-requests/committing-changes-to-your-project/creating-and-editing-commits/creating-a-commit-with-multiple-authors#required-co-author-information) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15140) Improve TopicCommandIntegrationTest to be less flaky
[ https://issues.apache.org/jira/browse/KAFKA-15140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-15140. --- Fix Version/s: (was: 3.5.1) Resolution: Fixed > Improve TopicCommandIntegrationTest to be less flaky > > > Key: KAFKA-15140 > URL: https://issues.apache.org/jira/browse/KAFKA-15140 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Divij Vaidya >Assignee: Lan Ding >Priority: Minor > Labels: flaky-test, newbie > Fix For: 3.8.0 > > > *This is a good Jira for folks who are new to contributing to Kafka.* > Tests in TopicCommandIntegrationTest get flaky from time to time. The > objective of the task is to make them more robust by doing the following: > 1. Replace the usage {-}createAndWaitTopic{-}() adminClient.createTopics() > method and other places where were are creating a topic (without waiting) > with > TestUtils.createTopicWithAdmin(). The latter method already contains the > functionality to create a topic and wait for metadata to sync up. > 2. Replace the number 6 at places such as > "adminClient.createTopics( > Collections.singletonList(new NewTopic("foo_bar", 1, 6.toShort)))" with a > meaningful constant. > 3. Add logs if an assertion fails, for example, lines such as " > assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), output)" should > have a third argument which prints the actual output printed so that we can > observe in the test logs on what was the output when assertion failed. > 4. Replace occurrences of "\n" with System.lineSeparator() which is platform > independent > 5. We should wait for reassignment to complete whenever we are re-assigning > partitions using alterconfig before we call describe to validate it. We could > use > TestUtils.waitForAllReassignmentsToComplete() > *Motivation of this task* > Try to fix the flaky test behaviour such as observed in > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13924/5/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_11_and_Scala_2_13___testDescribeUnderMinIsrPartitionsMixed_String__quorum_zk/] > > {noformat} > org.opentest4j.AssertionFailedError: expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180) > at > app//kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:794){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15140: improve TopicCommandIntegrationTest to be less flaky [kafka]
showuon merged PR #14891: URL: https://github.com/apache/kafka/pull/14891 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]
mimaison commented on PR #14206: URL: https://github.com/apache/kafka/pull/14206#issuecomment-1952110096 There's quite a few failures related to ZkMigration in the last CI run: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14206/22/testReport/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16167: --- Reviewer: Lucas Brutschy (was: Lucas Brutschy) > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16104: --- Reviewer: Lucas Brutschy (was: Lucas Brutschy) > Enable additional PlaintextConsumerTest tests for new consumer > -- > > Key: KAFKA-16104 > URL: https://issues.apache.org/jira/browse/KAFKA-16104 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests > Fix For: 3.8.0 > > > It should be possible to enable: > * testAutoCommitOnClose > * testAutoCommitOnCloseAfterWakeup > * testExpandingTopicSubscriptions > * testShrinkingTopicSubscriptions > * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed) > * testMultiConsumerSessionTimeoutOnStopPolling > * testAutoCommitOnRebalance > * testPerPartitionLeadMetricsCleanUpWithSubscribe > * testPerPartitionLagMetricsCleanUpWithSubscribe > * testStaticConsumerDetectsNewPartitionCreatedAfterRestart -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16230) Update verifiable_consumer.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16230: --- Reviewer: Lucas Brutschy (was: Lucas Brutschy) > Update verifiable_consumer.py to support KIP-848’s group protocol config > > > Key: KAFKA-16230 > URL: https://issues.apache.org/jira/browse/KAFKA-16230 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update {{verifiable_consumer.py}} to support the > {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument. It will default to > classic and we will take a separate task (Jira) to update the callers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13566) producer exponential backoff implementation
[ https://issues.apache.org/jira/browse/KAFKA-13566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-13566. Resolution: Duplicate > producer exponential backoff implementation > --- > > Key: KAFKA-13566 > URL: https://issues.apache.org/jira/browse/KAFKA-13566 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13565) consumer exponential backoff implementation
[ https://issues.apache.org/jira/browse/KAFKA-13565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-13565. Fix Version/s: 3.7.0 Resolution: Duplicate > consumer exponential backoff implementation > --- > > Key: KAFKA-13565 > URL: https://issues.apache.org/jira/browse/KAFKA-13565 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13567) adminClient exponential backoff implementation
[ https://issues.apache.org/jira/browse/KAFKA-13567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-13567. Resolution: Duplicate > adminClient exponential backoff implementation > -- > > Key: KAFKA-13567 > URL: https://issues.apache.org/jira/browse/KAFKA-13567 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16253) kafka_server_alterPartition_metrics_network_io_total is not supported in kafka 3.5.1
[ https://issues.apache.org/jira/browse/KAFKA-16253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Janardhana Gopalachar resolved KAFKA-16253. --- Resolution: Invalid > kafka_server_alterPartition_metrics_network_io_total is not supported in > kafka 3.5.1 > > > Key: KAFKA-16253 > URL: https://issues.apache.org/jira/browse/KAFKA-16253 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.1 >Reporter: Janardhana Gopalachar >Priority: Major > > Hi > The metrics below was supported in kafka 3.2.3 but in 3.5.1 it is not sent . > I tried to search the source code to see if this is present, but couldnt find > a reference. > Is the below metrics deprecated or not supported. ? > kafka_server_alterPartition_metrics_network_io_total > > in kafka3.2.3 we could get the metrics, but in 3.5.1 it is not available > HELP kafka_server_alterPartition_metrics_network_io_total The total number of > network operations (reads or writes) on all connections > kafka.server:name=null,type=alterPartition-metrics,attribute=network-io-total > :--:# TYPE kafka_server_alterPartition_metrics_network_io_total untyped > -kafka_server_alterPartition_metrics_network_io_total{BrokerId="0",} 10.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16253) kafka_server_alterPartition_metrics_network_io_total is not supported in kafka 3.5.1
[ https://issues.apache.org/jira/browse/KAFKA-16253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818402#comment-17818402 ] Janardhana Gopalachar commented on KAFKA-16253: --- the metrics is available with type type=alter-partition-metrics hence closing this bug bash-4.4$ curl localhost:9404 | grep kafka_server_alter_partition_metrics_network_io_total % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0# HELP kafka_server_alter_partition_metrics_network_io_total The total number of network operations (reads or writes) on all connections kafka.server:name=null,type=alter-partition-metrics,attribute=network-io-total # TYPE kafka_server_alter_partition_metrics_network_io_total untyped kafka_server_alter_partition_metrics_network_io_total{BrokerId="0",} 18.0 100 1682k 100 1682k 0 0 3136k 0 --:--:-- --:--:-- --:--:-- 3132k bash-4.4$ kafka-topics.sh --version ERROR StatusLogger Reconfiguration failed: No configuration found for '5cb0d902' at 'null' in 'null'3.5.1 (Commit:2c6fb6c54472e90a) > kafka_server_alterPartition_metrics_network_io_total is not supported in > kafka 3.5.1 > > > Key: KAFKA-16253 > URL: https://issues.apache.org/jira/browse/KAFKA-16253 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.1 >Reporter: Janardhana Gopalachar >Priority: Major > > Hi > The metrics below was supported in kafka 3.2.3 but in 3.5.1 it is not sent . > I tried to search the source code to see if this is present, but couldnt find > a reference. > Is the below metrics deprecated or not supported. ? > kafka_server_alterPartition_metrics_network_io_total > > in kafka3.2.3 we could get the metrics, but in 3.5.1 it is not available > HELP kafka_server_alterPartition_metrics_network_io_total The total number of > network operations (reads or writes) on all connections > kafka.server:name=null,type=alterPartition-metrics,attribute=network-io-total > :--:# TYPE kafka_server_alterPartition_metrics_network_io_total untyped > -kafka_server_alterPartition_metrics_network_io_total{BrokerId="0",} 10.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15833) Restrict Consumer API to be used from one thread
[ https://issues.apache.org/jira/browse/KAFKA-15833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15833: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Restrict Consumer API to be used from one thread > > > Key: KAFKA-15833 > URL: https://issues.apache.org/jira/browse/KAFKA-15833 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > Fix For: 3.7.0 > > > The legacy consumer restricts the API to be used from one thread only. This > is not enforced in the new consumer. To avoid inconsistencies in the > behavior, we should enforce the same restriction in the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16198) Reconciliation may lose partitions when topic metadata is delayed
[ https://issues.apache.org/jira/browse/KAFKA-16198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16198: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Reconciliation may lose partitions when topic metadata is delayed > - > > Key: KAFKA-16198 > URL: https://issues.apache.org/jira/browse/KAFKA-16198 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > The current reconciliation code in `AsyncKafkaConsumer`s `MembershipManager` > may lose part of the server-provided assignment when metadata is delayed. The > reason is incorrect handling of partially resolved topic names, as in this > example: > * We get assigned {{T1-1}} and {{T2-1}} > * We reconcile {{{}T1-1{}}}, {{T2-1}} remains in {{assignmentUnresolved}} > since the topic id {{T2}} is not known yet > * We get new cluster metadata, which includes {{{}T2{}}}, so {{T2-1}} is > moved to {{assignmentReadyToReconcile}} > * We call {{reconcile}} -- {{T2-1}} is now treated as the full assignment, > so {{T1-1}} is being revoked > * We end up with assignment {{T2-1, which is inconsistent with the > broker-side target assignment.}} > > Generally, this seems to be a problem around semantics of the internal > collections `assignmentUnresolved` and `assignmentReadyToReconcile`. Absence > of a topic in `assignmentReadyToReconcile` may mean either revocation of the > topic partition(s), or unavailability of a topic name for the topic. > Internal state with simpler and correct invariants could be achieved by using > a single collection `currentTargetAssignment` which is based on topic IDs and > always corresponds to the latest assignment received from the broker. During > every attempted reconciliation, all topic IDs will be resolved from the local > cache, which should not introduce a lot of overhead. `assignmentUnresolved` > and `assignmentReadyToReconcile` are removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15913) Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15913: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder > > > Key: KAFKA-15913 > URL: https://issues.apache.org/jira/browse/KAFKA-15913 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.7.0 > > > ConsumerTestBuilder is meant to be an unit testing utility; however, we seem > to use Mockito#spy quite liberally. This is not the right testing strategy > because we basically turn unit testing into integration testing. > > While the current unit tests run fine, we should probably make the mocking > using Mockito#mock by default and test each dependency independently. > > The ask here is > # Make mock(class) by default > # Provide more flexible interface for the testBuilder to allow user to > configure spy or mock. Or, let user pass in their own mock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14624) State restoration is broken with standby tasks and cache-enabled stores in processor API
[ https://issues.apache.org/jira/browse/KAFKA-14624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14624: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > State restoration is broken with standby tasks and cache-enabled stores in > processor API > > > Key: KAFKA-14624 > URL: https://issues.apache.org/jira/browse/KAFKA-14624 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 >Reporter: Balaji Rao >Assignee: Lucas Brutschy >Priority: Major > > I found that cache-enabled state stores in PAPI with standby tasks sometimes > returns stale data when a partition moves from one app instance to another > and back. [Here's|https://github.com/balajirrao/kafka-streams-multi-runner] a > small project that I used to reproduce the issue. > I dug around a bit and it seems like it's a bug in standby task state > restoration when caching is enabled. If a partition moves from instance 1 to > 2 and then back to instance 1, since the `CachingKeyValueStore` doesn't > register a restore callback, it can return potentially stale data for > non-dirty keys. > I could fix the issue by modifying the `CachingKeyValueStore` to register a > restore callback in which the cache restored keys are added to the cache. Is > this fix in the right direction? > {code:java} > // register the store > context.register( > root, > (RecordBatchingStateRestoreCallback) records -> { > for (final ConsumerRecord record : > records) { > put(Bytes.wrap(record.key()), record.value()); > } > } > ); > {code} > > I would like to contribute a fix, if I can get some help! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15941: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky test: shouldRestoreNullRecord() – > org.apache.kafka.streams.integration.RestoreIntegrationTest > --- > > Key: KAFKA-15941 > URL: https://issues.apache.org/jira/browse/KAFKA-15941 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Apoorv Mittal >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky-test > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/ > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output > (got []) ==> expected: but was: > Stacktraceorg.opentest4j.AssertionFailedError: Condition not met > within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records > from topic output (got []) ==> expected: but was: at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) >at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790) > at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-12679: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.7.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14343) Write upgrade/downgrade tests for enabling the state updater
[ https://issues.apache.org/jira/browse/KAFKA-14343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14343: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Write upgrade/downgrade tests for enabling the state updater > - > > Key: KAFKA-14343 > URL: https://issues.apache.org/jira/browse/KAFKA-14343 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > Write a test that verifies the upgrade from a version of Streams with state > updater disabled to a version with state updater enabled and vice versa, so > that we can offer a save upgrade path. > * upgrade test from a version of Streams with state updater disabled to a > version with state updater enabled (probably a system test since the old code > path will be removed from the code base) > * downgrade test from a version of Streams with state updater enabled to a > version with state updater disabled (probably a system test since the old > code path will be removed from the code base) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14014: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Assignee: Lucas Brutschy >Priority: Critical > Labels: flaky-test > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken
[ https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15957: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy > broken > --- > > Key: KAFKA-15957 > URL: https://issues.apache.org/jira/browse/KAFKA-15957 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Lucas Brutschy >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15977) DelegationTokenEndToEndAuthorizationWithOwnerTest leaks threads
[ https://issues.apache.org/jira/browse/KAFKA-15977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15977: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > DelegationTokenEndToEndAuthorizationWithOwnerTest leaks threads > --- > > Key: KAFKA-15977 > URL: https://issues.apache.org/jira/browse/KAFKA-15977 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-14878/runs/8/nodes/11/steps/90/log/?start=0] > > I had an unrelated PR fail with the following thread leak: > > {code:java} > Gradle Test Run :core:test > Gradle Test Executor 95 > > DelegationTokenEndToEndAuthorizationWithOwnerTest > executionError STARTED > kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.executionError > failed, log available in > /home/jenkins/workspace/Kafka_kafka-pr_PR-14878/core/build/reports/testOutput/kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.executionError.test.stdoutGradle > Test Run :core:test > Gradle Test Executor 95 > > DelegationTokenEndToEndAuthorizationWithOwnerTest > executionError FAILED > org.opentest4j.AssertionFailedError: Found 1 unexpected threads during > @AfterAll: `kafka-admin-client-thread | adminclient-483` ==> expected: > but was: {code} > > All the following tests on that error fail with initialization errors, > because the admin client thread is never closed. > > This is preceded by the following test failure: > > {code:java} > Gradle Test Run :core:test > Gradle Test Executor 95 > > DelegationTokenEndToEndAuthorizationWithOwnerTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(String, boolean) > [1] > quorum=kraft, isIdempotenceEnabled=true FAILED > org.opentest4j.AssertionFailedError: expected acls: > (principal=User:scram-user2, host=*, operation=CREATE_TOKENS, > permissionType=ALLOW) > (principal=User:scram-user2, host=*, operation=DESCRIBE_TOKENS, > permissionType=ALLOW) > but got: > > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at app//kafka.utils.TestUtils$.waitAndVerifyAcls(TestUtils.scala:1142) > at > app//kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.$anonfun$configureSecurityAfterServersStart$1(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:71) > at > app//kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.$anonfun$configureSecurityAfterServersStart$1$adapted(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:70) > at > app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) > at > app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) > at app//scala.collection.AbstractIterable.foreach(Iterable.scala:933) > at > app//kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:70){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-13531: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky test NamedTopologyIntegrationTest > --- > > Key: KAFKA-13531 > URL: https://issues.apache.org/jira/browse/KAFKA-13531 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Lucas Brutschy >Priority: Critical > Labels: flaky-test > Attachments: > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout > > > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets > {quote}java.lang.AssertionError: Did not receive all 3 records from topic > output-stream-2 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <3> but: <0> was less than <3> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote} > STDERR > {quote}java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting > offsets of a topic is forbidden while the consumer group is actively > subscribed to it. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213) > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39) > at > org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122) > at > org.apache.kafka.streams.processor.internals.TopologyMetadata.maybeNotifyTopologyVersionWaiters(TopologyMetadata.java:154) > at > org.apache.kafka.streams.processor.internals.StreamThread.checkForTopologyUpdates(StreamThread.java:916) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575) > Caused by: org.apache.kafka.common.errors.GroupSubscribedToTopicException: > Deleting offsets of a topic is forbidden while the consumer group is actively > subscribed to it. java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting > offsets of a topic is forbidden while the consumer group is actively > subscribed to it. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213) > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > jav
[jira] [Assigned] (KAFKA-7000) KafkaConsumer.position should wait for assignment metadata
[ https://issues.apache.org/jira/browse/KAFKA-7000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-7000: - Assignee: Lucas Brutschy (was: Lucas Brutschy) > KafkaConsumer.position should wait for assignment metadata > -- > > Key: KAFKA-7000 > URL: https://issues.apache.org/jira/browse/KAFKA-7000 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: John Roesler >Assignee: Lucas Brutschy >Priority: Blocker > Fix For: 2.0.0 > > > While updating Kafka Streams to stop using the deprecated > Consumer.poll(long), I found that this code unexpectedly throws an exception: > {code:java} > consumer.subscribe(topics); > // consumer.poll(0); <- I've removed this line, which shouldn't be necessary > here. > final Set partitions = new HashSet<>(); > for (final String topic : topics) { > for (final PartitionInfo partition : consumer.partitionsFor(topic)) { > partitions.add(new TopicPartition(partition.topic(), > partition.partition())); > } > } > for (final TopicPartition tp : partitions) { > final long offset = consumer.position(tp); > committedOffsets.put(tp, offset); > }{code} > Here is the exception: > {code:java} > Exception in thread "main" java.lang.IllegalStateException: You can only > check the position for partitions assigned to this consumer. >at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620) >at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586) >at > org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275) >at > org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148) >at > org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code} > > As you can see in the commented code in my snippet, we used to block for > assignment with a poll(0), which is now deprecated. > It seems reasonable to me for position() to do the same thing that poll() > does, which is call `coordinator.poll(timeout.toMillis())` early in > processing to ensure an up-to-date assignment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14432) RocksDBStore relies on finalizers to not leak memory
[ https://issues.apache.org/jira/browse/KAFKA-14432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14432: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > RocksDBStore relies on finalizers to not leak memory > > > Key: KAFKA-14432 > URL: https://issues.apache.org/jira/browse/KAFKA-14432 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Blocker > Fix For: 3.4.0 > > > Relying on finalizers in RocksDB has been deprecated for a long time, and > starting with rocksdb 7, finalizers are removed completely (see > [https://github.com/facebook/rocksdb/pull/9523]). > Kafka Streams currently relies on finalizers in parts to not leak memory. > This needs to be resolved before we can upgrade to RocksDB 7. > See [https://github.com/apache/kafka/pull/12809] . > This is a native heap profile after running Kafka Streams without finalizers > for a few hours: > {code:java} > Total: 13547.5 MB > 12936.3 95.5% 95.5% 12936.3 95.5% rocksdb::port::cacheline_aligned_alloc > 438.5 3.2% 98.7% 438.5 3.2% rocksdb::BlockFetcher::ReadBlockContents > 84.0 0.6% 99.3% 84.2 0.6% rocksdb::Arena::AllocateNewBlock > 45.9 0.3% 99.7% 45.9 0.3% prof_backtrace_impl > 8.1 0.1% 99.7% 14.6 0.1% rocksdb::BlockBasedTable::PutDataBlockToCache > 6.4 0.0% 99.8% 12941.4 95.5% Java_org_rocksdb_Statistics_newStatistics___3BJ > 6.1 0.0% 99.8% 6.9 0.1% rocksdb::LRUCacheShard::Insert@2d8b20 > 5.1 0.0% 99.9% 6.5 0.0% rocksdb::VersionSet::ProcessManifestWrites > 3.9 0.0% 99.9% 3.9 0.0% rocksdb::WritableFileWriter::WritableFileWriter > 3.2 0.0% 99.9% 3.2 0.0% std::string::_Rep::_S_create{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14530) Check state updater more than once in process loops
[ https://issues.apache.org/jira/browse/KAFKA-14530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14530: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Check state updater more than once in process loops > --- > > Key: KAFKA-14530 > URL: https://issues.apache.org/jira/browse/KAFKA-14530 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Minor > Fix For: 3.5.0 > > > In the new state restoration code, the state updater needs to be checked > regularly by the main thread to transfer ownership of tasks back to the main > thread once the state of the task is restored. The more often we check this, > the faster we can start processing the tasks. > Currently, we only check the state updater once in every loop iteration of > the state updater. And while we couldn't observe this to be strictly not > often enough, we can increase the number of checks easily by moving the check > inside the inner processing loop. This would mean that once we have iterated > over `numIterations` records, we can already start processing tasks that have > finished restoration in the meantime. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15832) Trigger client reconciliation based on manager poll
[ https://issues.apache.org/jira/browse/KAFKA-15832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15832: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Trigger client reconciliation based on manager poll > --- > > Key: KAFKA-15832 > URL: https://issues.apache.org/jira/browse/KAFKA-15832 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Critical > Labels: kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > Currently the reconciliation logic on the client is triggered when a new > target assignment is received and resolved, or when new unresolved target > assignments are discovered in metadata. > This could be improved by triggering the reconciliation logic on each poll > iteration, to reconcile whatever is ready to be reconciled. This would > require changes to support poll on the MembershipManager, and integrate it > with the current polling logic in the background thread. Receiving a new > target assignment from the broker, or resolving new topic names via a > metadata update could only ensure that the #assignmentReadyToReconcile is > properly updated (currently done), but wouldn't trigger the #reconcile() > logic, leaving that to the #poll() operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14415) ThreadCache is getting slower with every additional state store
[ https://issues.apache.org/jira/browse/KAFKA-14415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14415: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > ThreadCache is getting slower with every additional state store > --- > > Key: KAFKA-14415 > URL: https://issues.apache.org/jira/browse/KAFKA-14415 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.4.0 > > > There are a few lines in `ThreadCache` that I think should be optimized. > `sizeBytes` is called at least once, and potentially many times in every > `put` and is linear in the number of caches (= number of state stores, so > typically proportional to number of tasks). That means, with every additional > task, every put gets a little slower.Compare the throughput of TIME_ROCKS on > trunk (green graph): > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-3-4-0-51b7eb7937-jenkins-20221113214104-streamsbench/] > This is the throughput of TIME_ROCKS is 20% higher when a constant time > `sizeBytes` implementation is used: > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASCOMPARE-lucas-20221122140846-streamsbench/] > The same seems to apply for the MEM backend (initial throughput >8000 instead > of 6000), however, I cannot run the same benchmark here because the memory is > filled too quickly. > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASSTATE-lucas-20221121231632-streamsbench/] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15280) Implement client support for selecting KIP-848 server-side assignor
[ https://issues.apache.org/jira/browse/KAFKA-15280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15280: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Implement client support for selecting KIP-848 server-side assignor > --- > > Key: KAFKA-15280 > URL: https://issues.apache.org/jira/browse/KAFKA-15280 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > This includes: > * Validate the client’s configuration for server-side assignor selection > defined in config `group.remote.assignor` > * Include the assignor taken from config in the {{ConsumerGroupHeartbeat}} > request, in the `ServerAssignor` field > * Properly handle UNSUPPORTED_ASSIGNOR errors that may be returned in the HB > response if the server does not support the assignor defined by the consumer. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15690) EosIntegrationTest is flaky.
[ https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15690: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > EosIntegrationTest is flaky. > > > Key: KAFKA-15690 > URL: https://issues.apache.org/jira/browse/KAFKA-15690 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Calvin Liu >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky-test > > EosIntegrationTest > shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2, > processing threads = false] > {code:java} > org.junit.runners.model.TestTimedOutException: test timed out after 600 > seconds at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:) > at > org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:821) > at > org.apache.kafka.clients.NetworkClient.processTimeoutDisconnection(NetworkClient.java:779) >at > org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:837) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_1, processor=KSTREAM-SOURCE-00, > topic=multiPartitionInputTopic, partition=1, offset=15, > stacktrace=java.lang.RuntimeException: Detected we've been interrupted. > at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892) >at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867) >at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > {code} > shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing > threads = false] > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. at > org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:204) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:286) >at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:274) >at > org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:174) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_1, processor=KSTREAM-SOURCE-00, > topic=multiPartitionInputTopic, partition=1, offset=15, > stacktrace=java.lang.RuntimeException: Detected we've been interrupted. > at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892) >at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867) >at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > {code} > shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing > threads = false] > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > StreamsTasks did not request commit. ==> expected: but was: >at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) >at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) >at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) > java.lang.IllegalStateException: Replica > [Topic=__transaction_state,Partition=2,Replica=1] should be in the > OfflineReplica,ReplicaDeletionStarted states before moving to > ReplicaDeletionIneligible state. Instead it is in OnlineReplica state
[jira] [Assigned] (KAFKA-14532) Correctly handle failed fetch when partitions unassigned
[ https://issues.apache.org/jira/browse/KAFKA-14532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14532: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Correctly handle failed fetch when partitions unassigned > > > Key: KAFKA-14532 > URL: https://issues.apache.org/jira/browse/KAFKA-14532 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Blocker > Fix For: 3.4.0, 3.3.2 > > > On master, all our long-running test jobs are running into this exception: > {code:java} > java.lang.IllegalStateException: No current assignment for partition > stream-soak-test-KSTREAM-OUTERTHIS-86-store-changelog-1 2 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:370) > 3 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.clearPreferredReadReplica(SubscriptionState.java:623) > 4 at java.util.LinkedHashMap$LinkedKeySet.forEach(LinkedHashMap.java:559) 5 > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onFailure(Fetcher.java:349) > 6 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:179) > 7 at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:149) > 8 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:613) > 9 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427) > 10 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > 11 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) > 12 at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307) > 13 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > 14 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > 15 at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:450) > 16 at > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:910) > 17 at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:773) > 18 at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:613) > 19 at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575) > 20[2022-12-13 04:01:59,024] ERROR [i-016cf5d2c1889c316-StreamThread-1] > stream-client [i-016cf5d2c1889c316] Encountered the following exception > during processing and sent shutdown request for the entire application. > (org.apache.kafka.streams.KafkaStreams) > 21org.apache.kafka.streams.errors.StreamsException: > java.lang.IllegalStateException: No current assignment for partition > stream-soak-test-KSTREAM-OUTERTHIS-86-store-changelog-1 22 at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:653) > 23 at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575) > 24Caused by: java.lang.IllegalStateException: No current assignment for > partition stream-soak-test-KSTREAM-OUTERTHIS-86-store-changelog-1 25 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:370) > 26 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.clearPreferredReadReplica(SubscriptionState.java:623) > 27 at java.util.LinkedHashMap$LinkedKeySet.forEach(LinkedHashMap.java:559) > 28 at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onFailure(Fetcher.java:349) > 29 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:179) > 30 at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:149) > 31 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:613) > 32 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427) > 33 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > 34 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) > 35 at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307) > 36 at > org.apache.kafka.clients.co
[jira] [Assigned] (KAFKA-14309) Kafka Streams upgrade tests do not cover for FK-joins
[ https://issues.apache.org/jira/browse/KAFKA-14309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14309: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Kafka Streams upgrade tests do not cover for FK-joins > - > > Key: KAFKA-14309 > URL: https://issues.apache.org/jira/browse/KAFKA-14309 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > The current streams upgrade system test for FK joins inserts the production > of foreign key data and an actual foreign key join in every version of > SmokeTestDriver except for the latest. The effect was that FK join upgrades > are not tested at all, since no FK join code is executed after the bounce in > the system test. > We should enable the FK-join code in the system test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15803) Update last seen epoch during commit
[ https://issues.apache.org/jira/browse/KAFKA-15803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15803: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Update last seen epoch during commit > > > Key: KAFKA-15803 > URL: https://issues.apache.org/jira/browse/KAFKA-15803 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-e2e, kip-848-preview > Fix For: 3.7.0 > > > At the time we implemented commitAsync in the prototypeAsyncConsumer, > metadata was not there. The ask here is to investigate if we need to add the > following function to the commit code: > > private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, > OffsetAndMetadata offsetAndMetadata) { > if (offsetAndMetadata != null) > offsetAndMetadata.leaderEpoch().ifPresent(epoch -> > metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); > } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15326) Decouple Processing Thread from Polling Thread
[ https://issues.apache.org/jira/browse/KAFKA-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15326: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Decouple Processing Thread from Polling Thread > -- > > Key: KAFKA-15326 > URL: https://issues.apache.org/jira/browse/KAFKA-15326 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Critical > > As part of an ongoing effort to implement a better threading architecture in > Kafka streams, we decouple N stream threads into N polling threads and N > processing threads. The effort to consolidate N polling thread into a single > thread is follow-up after this ticket. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15942) Implement ConsumerInterceptor
[ https://issues.apache.org/jira/browse/KAFKA-15942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15942: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Implement ConsumerInterceptor > - > > Key: KAFKA-15942 > URL: https://issues.apache.org/jira/browse/KAFKA-15942 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Blocker > Labels: consumer-threading-refactor, interceptors > Fix For: 3.8.0 > > > As title, we need to implement ConsumerInterceptor in the AsyncKafkaConsumer > > This is the current code. The implementation would be very similar > {code:java} > if (interceptors != null) > interceptors.onCommit(offsets); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16097) State updater removes task without pending action in EOSv2
[ https://issues.apache.org/jira/browse/KAFKA-16097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16097: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > State updater removes task without pending action in EOSv2 > -- > > Key: KAFKA-16097 > URL: https://issues.apache.org/jira/browse/KAFKA-16097 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > A long-running soak encountered the following exception: > > {code:java} > [2024-01-08 03:06:00,586] ERROR [i-081c089d2ed054443-StreamThread-3] Thread > encountered an error processing soak test > (org.apache.kafka.streams.StreamsSoakTest) > java.lang.IllegalStateException: Got a removed task 1_0 from the state > updater that is not for recycle, closing, or updating input partitions; this > should not happen > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRemovedTasksFromStateUpdater(TaskManager.java:939) > at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > [2024-01-08 03:06:00,587] ERROR [i-081c089d2ed054443-StreamThread-3] > stream-client [i-081c089d2ed054443] Encountered the following exception > during processing and sent shutdown request for the entire application. > (org.apache.kafka.streams.KafkaStreams) > org.apache.kafka.streams.errors.StreamsException: > java.lang.IllegalStateException: Got a removed task 1_0 from the state > updater that is not for recycle, closing, or updating input partitions; this > should not happen > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > Caused by: java.lang.IllegalStateException: Got a removed task 1_0 from the > state updater that is not for recycle, closing, or updating input partitions; > this should not happen > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRemovedTasksFromStateUpdater(TaskManager.java:939) > at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > ... 1 more{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
[ https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15798: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky Test > NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() > - > > Key: KAFKA-15798 > URL: https://issues.apache.org/jira/browse/KAFKA-15798 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Justine Olshan >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky-test > > I saw a few examples recently. 2 have the same error, but the third is > different > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/] > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > > The failure is like > {code:java} > java.lang.AssertionError: Did not receive all 5 records from topic > output-stream-1 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <5> but: <0> was less than <5>{code} > The other failure was > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > {code:java} > java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16220) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16220: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > is flaky > > > Key: KAFKA-16220 > URL: https://issues.apache.org/jira/browse/KAFKA-16220 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky, flaky-test > > This test has seen significant flakyness > > https://ge.apache.org/s/fac7lploprvuu/tests/task/:streams:test/details/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest/shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()?top-execution=1 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback
[ https://issues.apache.org/jira/browse/KAFKA-15865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15865: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Ensure consumer.poll() execute autocommit callback > -- > > Key: KAFKA-15865 > URL: https://issues.apache.org/jira/browse/KAFKA-15865 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.7.0 > > > When the network thread completes autocommits, we need to send a > message/event to the application to notify the thread to execute the > callback. In KAFKA-15327, the network thread sends a > AutoCommitCompletionBackgroundEvent to the polling thread. The polling > thread should trigger the OffsetCommitCallback upon receiving it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14278) Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit
[ https://issues.apache.org/jira/browse/KAFKA-14278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14278: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit > --- > > Key: KAFKA-14278 > URL: https://issues.apache.org/jira/browse/KAFKA-14278 > Project: Kafka > Issue Type: Sub-task > Components: producer , streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434
[ https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15319: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Upgrade rocksdb to fix CVE-2022-37434 > - > > Key: KAFKA-15319 > URL: https://issues.apache.org/jira/browse/KAFKA-15319 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.1 >Reporter: Maruthi >Assignee: Lucas Brutschy >Priority: Critical > Fix For: 3.6.0, 3.5.2 > > Attachments: compat_report.html.zip > > > Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12 > Upgrade to 1.2.13 to fix > https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14299) Benchmark and stabilize state updater
[ https://issues.apache.org/jira/browse/KAFKA-14299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14299: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Benchmark and stabilize state updater > - > > Key: KAFKA-14299 > URL: https://issues.apache.org/jira/browse/KAFKA-14299 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > We need to benchmark and stabilize the separate state restoration code path. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
[ https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-12935: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky Test > RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore > > > Key: KAFKA-12935 > URL: https://issues.apache.org/jira/browse/KAFKA-12935 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Lucas Brutschy >Priority: Critical > Labels: flaky-test > Fix For: 3.4.0 > > Attachments: > RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore[true].rtf > > > {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374) > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)