Re: [PR] KAFKA-15876: Introduce RemoteStorageNotReadyException retryable error [kafka]
showuon commented on code in PR #14822: URL: https://github.com/apache/kafka/pull/14822#discussion_r1449931227 ## clients/src/main/java/org/apache/kafka/common/errors/RemoteStorageNotReadyException.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This retryable exception indicates that remote storage is not ready to receive the requests yet. Review Comment: nit: Below we extends `RetriableException`, so maybe this should spell as `retriable`? ## clients/src/main/java/org/apache/kafka/common/errors/RemoteStorageNotReadyException.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This retryable exception indicates that remote storage is not ready to receive the requests yet. Review Comment: Should we mention this will only be used when using `TopicBasedRemoteLogMetadataManager`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
artemlivshits commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1449897783 ## core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java: ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.handlers; + +import kafka.network.RequestChannel; +import kafka.server.AuthHelper; +import kafka.server.KafkaConfig; +import kafka.server.MetadataCache; +import kafka.server.metadata.KRaftMetadataCache; +import kafka.server.metadata.ZkMetadataCache; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest; +import org.apache.kafka.common.resource.Resource; +import scala.collection.JavaConverters; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; +import static org.apache.kafka.common.resource.ResourceType.TOPIC; + +public class DescribeTopicPartitionsRequestHandler { +MetadataCache metadataCache; +AuthHelper authHelper; +KafkaConfig config; + +public DescribeTopicPartitionsRequestHandler( +MetadataCache metadataCache, +AuthHelper authHelper, +KafkaConfig config +) { +this.metadataCache = metadataCache; +this.authHelper = authHelper; +this.config = config; +} + +public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) { +if (metadataCache instanceof ZkMetadataCache) { +throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request"); +} +KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache) metadataCache; + +DescribeTopicPartitionsRequestData request = ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data(); +Set topics = new HashSet<>(); +boolean fetchAllTopics = request.topics().isEmpty(); +DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); +if (fetchAllTopics) { +if (cursor != null) { +// Includes the cursor topic in case the cursor topic does not exist anymore. +topics.add(cursor.topicName()); Review Comment: Instead of doing this, can we get all topics that are >= cursor and then check (after sorting) if the cursor topic is present and reset partition index to 0 if it's not? This would also handle the unauthorized case. ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Get the topic metadata for the given topics. + * + * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first + * partition can't be returned due the limit. + * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response. + * + * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData + * will also be sorted in alphabetical order. + * + * @param topicsThe set of topics and their corresponding first partition id to fetch. + * @param listenerName The listener name. + * @param firstTopicPartitionStartIndex The start partition index for the first topic + * @param maximumNumberOfPartitions The max number of partitions to return. + */ + def
Re: [PR] MINOR: Add test case for follower fetch [kafka]
showuon commented on PR #14212: URL: https://github.com/apache/kafka/pull/14212#issuecomment-1888514023 @divijvaidya , do you want to have another look at this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [KAFKA-15749] Adding support for Kraft in test testClusterIdPresent [kafka]
chirag-wadhwa5 opened a new pull request, #15181: URL: https://github.com/apache/kafka/pull/15181 Adding the KRaft test for testClusterIdPresent() in KafkaMetricReporterClusterIdTest class Ref: [KAFKA-15749](https://issues.apache.org/jira/browse/KAFKA-15749?jql=labels%20%3D%20kraft-test) Testing: test passed successfully. Adding a screenshot below https://github.com/apache/kafka/assets/122860692/ce2e6b23-c9ff-456b-a968-4ca7477b77e2";> ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805883#comment-17805883 ] Phuc Hong Tran commented on KAFKA-15561: I figured those out, so they were auto-gerenated classes. > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561 ] Phuc Hong Tran deleted comment on KAFKA-15561: was (Author: JIRAUSER301295): [~lianetm], look like the problem is on my local machine. > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805880#comment-17805880 ] Phuc Hong Tran commented on KAFKA-15561: [~lianetm], look like the problem is on my local machine. > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
showuon commented on PR #15133: URL: https://github.com/apache/kafka/pull/15133#issuecomment-1888394290 @satishd , call for review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add isRemoteLogEnabled parameter to the Log Loader Javadoc [kafka]
showuon commented on PR #15179: URL: https://github.com/apache/kafka/pull/15179#issuecomment-1888393499 This is just a javadoc update. I'll merge it after the CI build completes if no other comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable kraft test in kafka.api [kafka]
dengziming merged PR #14595: URL: https://github.com/apache/kafka/pull/14595 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Optimization of equals methods on implementations of Commands.Handler in shell.command package [kafka]
github-actions[bot] commented on PR #14018: URL: https://github.com/apache/kafka/pull/14018#issuecomment-1888380284 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test case for follower fetch [kafka]
github-actions[bot] commented on PR #14212: URL: https://github.com/apache/kafka/pull/14212#issuecomment-1888380213 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]
github-actions[bot] commented on PR #14491: URL: https://github.com/apache/kafka/pull/14491#issuecomment-1888380144 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16095) Update list group state type filter to include the states for the new consumer group type
[ https://issues.apache.org/jira/browse/KAFKA-16095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805868#comment-17805868 ] Lan Ding commented on KAFKA-16095: -- [~rreddy22] I think the flag "–type" should be added to allow filtering by group type in the ConsumerGroupCommand tool, perhaps we should wait for [KAFKA-15460] to finish first. > Update list group state type filter to include the states for the new > consumer group type > - > > Key: KAFKA-16095 > URL: https://issues.apache.org/jira/browse/KAFKA-16095 > Project: Kafka > Issue Type: Sub-task >Reporter: Ritika Reddy >Assignee: Lan Ding >Priority: Minor > > # While using *—list —state* the current accepted values correspond to the > classic group type states. We need to include support for the new group type > states. > ## Consumer Group: Should list the state of the group. Accepted Values: > ### _UNKNOWN(“unknown”)_ > ### {_}EMPTY{_}("empty"), > ### *{_}ASSIGNING{_}("assigning"),* > ### *{_}RECONCILING{_}("reconciling"),* > ### {_}STABLE{_}("stable"), > ### {_}DEAD{_}("dead"); > # > ## Classic Group : Should list the state of the group. Accepted Values: > ### {_}UNKNOWN{_}("Unknown"), > ### {_}EMPTY{_}("Empty"); > ### *{_}PREPARING_REBALANCE{_}("PreparingRebalance"),* > ### *{_}COMPLETING_REBALANCE{_}("CompletingRebalance"),* > ### {_}STABLE{_}("Stable"), > ### {_}DEAD{_}("Dead") -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Log a warning when connectors generate greater than tasks.max task configs [kafka]
gharris1727 commented on PR #14694: URL: https://github.com/apache/kafka/pull/14694#issuecomment-1888248086 @C0urante Is this superseded by #15180 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8115) Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated
[ https://issues.apache.org/jira/browse/KAFKA-8115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805849#comment-17805849 ] Greg Harris commented on KAFKA-8115: We doubled the timeout for this test in KAFKA-15760 after looking at some cpu-restricted profiles of the test showed the timeout could be exceeded by just loading classes for the REST server. I'll keep monitoring this to see if the flakiness changes. > Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated > --- > > Key: KAFKA-8115 > URL: https://issues.apache.org/jira/browse/KAFKA-8115 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Greg Harris >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3254/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/testTaskRequestWithOldStartMsGetsUpdated/] > {quote}org.junit.runners.model.TestTimedOutException: test timed out after > 12 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native > Method) at > java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234) > at > java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123) > at > java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454) > at > java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709) > at > app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:157) > at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) > at > app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285) > at > app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596) > at > java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.1/java.lang.reflect.Method.invoke(Method.java:566) at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base@11.0.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base@11.0.1/java.lang.Thread.run(Thread.java:834){quote} > STDOUT > {quote}[2019-03-15 09:23:41,364] INFO Creating MiniTrogdorCluster with > agents: node02 and coordinator: node01 > (org.apache.kafka.trogdor.common.MiniTrogdorCluster:135) [2019-03-15 > 09:23:41,595] INFO Logging initialized @13340ms to > org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193) > [2019-03-15 09:23:41,752] INFO Starting REST server > (org.apache.kafka.trogdor.rest.JsonRestServer:89) [2019-03-15 09:23:41,912] > INFO Registered resource > org.apache.kafka.trogdor.agent.AgentRestResource@3fa38ceb > (org.apache.kafka.trogdor.rest.JsonRestServer:94) [2019-03-15 09:23:42,178] > INFO jetty-9.4.14.v20181114; built: 2018-11-14T21:20:31.478Z; git: > c4550056e785fb5665914545889f21dc136ad9e6; jvm 11.0.1+13-LTS > (org.eclipse.jetty.server.Server:370) [2019-03-15 09:23:42,360] INFO > DefaultSessionIdManager workerName=node0 > (org.eclipse.jetty.server.session:365) [2019-03-15 09:23:42,362] INFO No > SessionScavenger set, using defaults (org.eclipse.jetty.server.session:370) > [2019-03-15 09:23:42,370] INFO node0 Scavenging every 66ms > (org.eclipse.jetty.server.session:149) [2019-03-15 09:23:44,412] INFO Started > o.e.j.s.ServletContextHandler@335a5293\{/,null,AVAILABLE} > (org.eclipse.jetty.server.handler.ContextHandler:855) [2019-03-15 > 09:23:44,473] INFO Started > ServerConnector@79a93bf1\{HTTP/1.1,[http/1.1]}{0.0.0.0:33477} > (org.eclipse.jetty.server.AbstractConnector:292) [2019-03-15 09:23:44,474] > INFO Started @16219ms (org.eclipse.jetty.server.Server:407) [2019-03-15 > 09:23:44,475] INFO REST server
[jira] [Resolved] (KAFKA-15760) org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-15760. - Fix Version/s: 3.8.0 Assignee: David Mao Resolution: Fixed > org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated > is flaky > -- > > Key: KAFKA-15760 > URL: https://issues.apache.org/jira/browse/KAFKA-15760 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Calvin Liu >Assignee: David Mao >Priority: Major > Labels: flaky-test > Fix For: 3.8.0 > > > Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – > org.apache.kafka.trogdor.coordinator.CoordinatorTest > {code:java} > java.util.concurrent.TimeoutException: > testTaskRequestWithOldStartMsGetsUpdated() timed out after 12 > milliseconds at > org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29) >at > org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) >at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) >at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) >at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]
gharris1727 merged PR #14917: URL: https://github.com/apache/kafka/pull/14917 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]
gharris1727 commented on PR #14917: URL: https://github.com/apache/kafka/pull/14917#issuecomment-1888236729 Test failures appear unrelated, the test which is being changed has no failures, and this is just a timeout increase. Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
gharris1727 commented on PR #15101: URL: https://github.com/apache/kafka/pull/15101#issuecomment-1888225488 > One reason that some stream tests are failing because they use the same server for the duration of the entire test suite and don't create a server per test, for example MetricsIntegrationTest.shouldAddMetricsOnAllLevels(). > Should we change our logic to test for these leaked threads at the end of every test class, instead of at the end of every test? @divijvaidya The stateful strategy of diffing the threads before and after each test should exclude the test-suite threads which are created before the test method begins. What is probably happening here is that there are some threads being lazily created as a side-effect of the actions of the test but which are not cleaned up immediately, but are later cleaned up by the whole cluster shutting down. I wouldn't recommend immediately reducing the scope to the Class-only enforcement, as it makes it so much harder to blame specific tests which contain leaks. It's so much more helpful to get a "This test leaked a thread" warning than a "this suite leaked a test" warning, especially when the assertion doesn't tell you where the thread is being allocated. As far as what to do next: 1. You could figure out how to disable the lazy threads with some development-only configuration 2. You could figure out how to clean up these lazy threads, either automatically or with a development-only hint method. 3. You could give these particular threads special privileges (by adding to the expected thread names) 4. You could give these particular tests special privileges (with an `Ignore` annotation) These are technically leaks, and the extension is correctly finding them. It's up to us to figure out if they're worth fixing or if they're generally harmless. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]
gharris1727 commented on code in PR #15180: URL: https://github.com/apache/kafka/pull/15180#discussion_r1449550586 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -620,6 +650,11 @@ private boolean startTask( try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); + +int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG); Review Comment: nit: make a `tasksMax` getter? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -391,7 +391,15 @@ public List> connectorTaskConfigs(String connName, Connector Connector connector = workerConnector.connector(); Review Comment: I didn't realize that this was the only place where Connector escapes the WorkerConnector, and a different thread interacts with the Connector object. I know the taskConfigs method is typically an instantaneous method, but maybe it would make sense for this to eventually move to the WorkerConnector thread instead of the herder tick thread. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ## @@ -131,9 +133,26 @@ public void run() { } } +/** + * Fail the connector. + * @param cause the cause of the failure; if null, the connector will not be failed + */ +public void fail(Throwable cause) { +synchronized (this) { +if (this.externalFailure != null) +return; +this.externalFailure = cause; +notify(); +} +} + void doRun() { initialize(); while (!stopping) { +Throwable failure = externalFailure; +if (failure != null) +onFailure(failure); Review Comment: Is this in danger of being called more than once, particularly if the connector has this problem and then a pause/resume request comes in? Is that a bad thing? It looks like the connector thread just waits in this loop until something external calls shutdown(), so I would expect this to get called whenever someone notify()'s the worker connector thread. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ## @@ -131,9 +133,26 @@ public void run() { } } +/** + * Fail the connector. + * @param cause the cause of the failure; if null, the connector will not be failed + */ +public void fail(Throwable cause) { +synchronized (this) { +if (this.externalFailure != null) +return; +this.externalFailure = cause; +notify(); +} +} + void doRun() { initialize(); while (!stopping) { +Throwable failure = externalFailure; +if (failure != null) Review Comment: nit: curly braces ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -391,7 +391,15 @@ public List> connectorTaskConfigs(String connName, Connector Connector connector = workerConnector.connector(); try (LoaderSwap loaderSwap = plugins.withClassLoader(workerConnector.loader())) { String taskClassName = connector.taskClass().getName(); -for (Map taskProps : connector.taskConfigs(maxTasks)) { +List> taskConfigs = connector.taskConfigs(maxTasks); +try { +checkTasksMax(connName, taskConfigs.size(), maxTasks, connConfig.enforceTasksMax()); +} catch (TooManyTasksException e) { +// TODO: This control flow is awkward. Push task config generation into WorkerConnector class? Review Comment: This makes sense to me; checkTasksMax could be public static, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449513344 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java: ## @@ -168,11 +102,13 @@ public String toString(boolean includeMessage) { builder.append("' with class '"); builder.append(executingClass() == null ? "null" : executingClass().getName()); builder.append('\''); -if (includeMessage && sourceRecord() != null) { +T original = original(); +if (includeMessage && original instanceof SourceRecord) { Review Comment: Ill expand on this, since it also provides context for the strange casting in DeadLetterQueueReporter. I think the strange "if source do [x], if sink do [y]" in `ProcessingContext#toString(boolean)` and `DeadLetterQueueReporter#report()` are both symptoms of the inadequate ErrorReporter signature. Since there's only one ErrorReporter signature used by both sources and sinks, there isn't anything in the type definitions to force the DeadLetterQueueReporter to only work with sinks, so it has to include a runtime check in the implementation. Similarly, there are two different "kinds" of LogReporter, the one for sources that hits the first branch in the toString, and one that hits the second. Each LogReporter instance only ever takes one of the branches, but that isn't clear from the implementation. If we make ErrorReporter generic, we can have the implementations give functionality for the specific record types when compiling or instantiating, instead of in the report() function. But pulling on this thread unravelled a bit more: To make ErrorReporter generic, we have to make RetryWithToleranceOperator generic, make LogReporter generic, change some RetryWithToleranceOperator instance reuse, and move the RetryWithToleranceOperator out of WorkerTask. All of which are just lateral refactors, and I think would be easier to address in a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449513344 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java: ## @@ -168,11 +102,13 @@ public String toString(boolean includeMessage) { builder.append("' with class '"); builder.append(executingClass() == null ? "null" : executingClass().getName()); builder.append('\''); -if (includeMessage && sourceRecord() != null) { +T original = original(); +if (includeMessage && original instanceof SourceRecord) { Review Comment: Ill expand on this, since it also provides context for the strange casting in DeadLetterQueueReporter. I think the strange "if source do , if sink do " in `ProcessingContext#toString(boolean)` and `DeadLetterQueueReporter#report()` are both symptoms of the inadequate ErrorReporter signature. Since there's only one ErrorReporter signature used by both sources and sinks, there isn't anything in the type definitions to force the DeadLetterQueueReporter to only work with sinks, so it has to include a runtime check in the implementation. Similarly, there are two different "kinds" of LogReporter, the one for sources that hits the first branch in the toString, and one that hits the second. Each LogReporter instance only ever takes one of the branches, but that isn't clear from the implementation. If we make ErrorReporter generic, we can have the implementations give functionality for the specific record types when compiling or instantiating, instead of in the report() function. But pulling on this thread unravelled a bit more: To make ErrorReporter generic, we have to make RetryWithToleranceOperator generic, make LogReporter generic, change some RetryWithToleranceOperator instance reuse, and move the RetryWithToleranceOperator out of WorkerTask. All of which are just lateral refactors, and I think would be easier to address in a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16116: -- Fix Version/s: 3.8.0 > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
[ https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16115: -- Component/s: clients > AsyncKafkaConsumer: Add missing heartbeat metrics > - > > Key: KAFKA-16115 > URL: https://issues.apache.org/jira/browse/KAFKA-16115 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The following metrics are missing: > |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| > |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| > |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| > |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| > |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16116: -- Component/s: clients > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
[ https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16115: -- Labels: consumer-threading-refactor (was: ) > AsyncKafkaConsumer: Add missing heartbeat metrics > - > > Key: KAFKA-16115 > URL: https://issues.apache.org/jira/browse/KAFKA-16115 > Project: Kafka > Issue Type: Sub-task > Components: consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The following metrics are missing: > |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| > |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| > |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| > |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| > |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16095) Update list group state type filter to include the states for the new consumer group type
[ https://issues.apache.org/jira/browse/KAFKA-16095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805828#comment-17805828 ] Ritika Reddy commented on KAFKA-16095: -- [~isding_l] When would you be taking up this task? We do need it in production fairly soon > Update list group state type filter to include the states for the new > consumer group type > - > > Key: KAFKA-16095 > URL: https://issues.apache.org/jira/browse/KAFKA-16095 > Project: Kafka > Issue Type: Sub-task >Reporter: Ritika Reddy >Assignee: Lan Ding >Priority: Minor > > # While using *—list —state* the current accepted values correspond to the > classic group type states. We need to include support for the new group type > states. > ## Consumer Group: Should list the state of the group. Accepted Values: > ### _UNKNOWN(“unknown”)_ > ### {_}EMPTY{_}("empty"), > ### *{_}ASSIGNING{_}("assigning"),* > ### *{_}RECONCILING{_}("reconciling"),* > ### {_}STABLE{_}("stable"), > ### {_}DEAD{_}("dead"); > # > ## Classic Group : Should list the state of the group. Accepted Values: > ### {_}UNKNOWN{_}("Unknown"), > ### {_}EMPTY{_}("Empty"); > ### *{_}PREPARING_REBALANCE{_}("PreparingRebalance"),* > ### *{_}COMPLETING_REBALANCE{_}("CompletingRebalance"),* > ### {_}STABLE{_}("Stable"), > ### {_}DEAD{_}("Dead") -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
[ https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16115: -- Fix Version/s: 3.8.0 > AsyncKafkaConsumer: Add missing heartbeat metrics > - > > Key: KAFKA-16115 > URL: https://issues.apache.org/jira/browse/KAFKA-16115 > Project: Kafka > Issue Type: Sub-task > Components: consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Fix For: 3.8.0 > > > The following metrics are missing: > |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| > |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| > |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| > |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| > |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16117) Add Integration test for checking if the correct assignor is chosen
Ritika Reddy created KAFKA-16117: Summary: Add Integration test for checking if the correct assignor is chosen Key: KAFKA-16117 URL: https://issues.apache.org/jira/browse/KAFKA-16117 Project: Kafka Issue Type: Sub-task Reporter: Ritika Reddy h4. We are trying to test this section of the KIP-848 h4. Assignor Selection The group coordinator has to determine which assignment strategy must be used for the group. The group's members may not have exactly the same assignors at any given point in time - e.g. they may migrate from an assignor to another one for instance. The group coordinator will chose the assignor as follow: * A client side assignor is used if possible. This means that a client side assignor must be supported by all the members. If multiple are, it will respect the precedence defined by the members when they advertise their supported client side assignors. * A server side assignor is used otherwise. If multiple server side assignors are specified in the group, the group coordinator uses the most common one. If a member does not provide an assignor, the group coordinator will default to the first one in {{{}group.consumer.assignors{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16022) AsyncKafkaConsumer sometimes complains “No current assignment for partition {}”
[ https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16022: -- Summary: AsyncKafkaConsumer sometimes complains “No current assignment for partition {}” (was: AsyncKafkaConsumer sometimes complains "No current assignment for partition {}") > AsyncKafkaConsumer sometimes complains “No current assignment for partition > {}” > --- > > Key: KAFKA-16022 > URL: https://issues.apache.org/jira/browse/KAFKA-16022 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > This seems to be a timing issue that before the member receives any > assignment from the coordinator, the fetcher will try to find the current > position causing "No current assignment for partition {}". This creates a > small amount of noise to the log. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on PR #15148: URL: https://github.com/apache/kafka/pull/15148#issuecomment-1888052029 @mjsax Can you please take a re-look, I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16116: --- Labels: consumer-threading-refactor (was: ) > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Sub-task > Components: consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16116: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Improvement) > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Sub-task > Components: consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16116: --- Component/s: consumer metrics > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Improvement > Components: consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
Philip Nee created KAFKA-16116: -- Summary: AsyncKafkaConsumer: Add missing rebalance metrics Key: KAFKA-16116 URL: https://issues.apache.org/jira/browse/KAFKA-16116 Project: Kafka Issue Type: Improvement Reporter: Philip Nee Assignee: Philip Nee The following metrics are missing: |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
Philip Nee created KAFKA-16115: -- Summary: AsyncKafkaConsumer: Add missing heartbeat metrics Key: KAFKA-16115 URL: https://issues.apache.org/jira/browse/KAFKA-16115 Project: Kafka Issue Type: Improvement Components: consumer, metrics Reporter: Philip Nee Assignee: Philip Nee The following metrics are missing: |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
[ https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16115: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Improvement) > AsyncKafkaConsumer: Add missing heartbeat metrics > - > > Key: KAFKA-16115 > URL: https://issues.apache.org/jira/browse/KAFKA-16115 > Project: Kafka > Issue Type: Sub-task > Components: consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > The following metrics are missing: > |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| > |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| > |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| > |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| > |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805806#comment-17805806 ] Phuc Hong Tran commented on KAFKA-15561: [~lianetm] are the classes that implements ApiMessage have some special settings to them so that peolple can't make change to them? Every times I make a change and them compile the code those changes are auto removed. > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449403859 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ## @@ -303,48 +303,14 @@ public String toString() { * @param reporters the error reporters (should not be null). */ public synchronized void reporters(List reporters) { -this.context.reporters(reporters); -} - -/** - * Set the source record being processed in the connect pipeline. - * - * @param preTransformRecord the source record - */ -public synchronized void sourceRecord(SourceRecord preTransformRecord) { -this.context.sourceRecord(preTransformRecord); -} - -/** - * Set the record consumed from Kafka in a sink connector. - * - * @param consumedMessage the record - */ -public synchronized void consumerRecord(ConsumerRecord consumedMessage) { -this.context.consumerRecord(consumedMessage); -} - -/** - * @return true, if the last operation encountered an error; false otherwise - */ -public synchronized boolean failed() { -return this.context.failed(); -} - -/** - * Returns the error encountered when processing the current stage. - * - * @return the error encountered when processing the current stage - */ -public synchronized Throwable error() { -return this.context.error(); +this.reporters = Objects.requireNonNull(reporters, "reporters"); Review Comment: While I like the CachedSupplier(Supplier supplier) signature, I think it is probably not a good idea to use it with AutoCloseables, as the ownership of the object is ambiguous from the signature. Currently I know that Supplier> puts the responsibility on the caller of get() to close the returned error reporters, but if CachedSupplier exists, some call-sites of get() will close the error reporters, and some won't. I guess the SharedTopicAdmin is the most similar thing I can think of. When it's a SharedTopicAdmin object, it needs to be closed. When it is cast to a Supplier, it doesn't need to be closed. Perhaps we could do something similar here. Perhaps the following (a little lighter weight than SharedTopicAdmin)? ``` public class AutoClosableSupplier implements AutoCloseable, Supplier { public AutoCloseableSupplier(Supplier s, String closeMessage) { ... } } ``` edit: List is itself not closeable, maybe stuff would need to change to be List> or similar... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449403859 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ## @@ -303,48 +303,14 @@ public String toString() { * @param reporters the error reporters (should not be null). */ public synchronized void reporters(List reporters) { -this.context.reporters(reporters); -} - -/** - * Set the source record being processed in the connect pipeline. - * - * @param preTransformRecord the source record - */ -public synchronized void sourceRecord(SourceRecord preTransformRecord) { -this.context.sourceRecord(preTransformRecord); -} - -/** - * Set the record consumed from Kafka in a sink connector. - * - * @param consumedMessage the record - */ -public synchronized void consumerRecord(ConsumerRecord consumedMessage) { -this.context.consumerRecord(consumedMessage); -} - -/** - * @return true, if the last operation encountered an error; false otherwise - */ -public synchronized boolean failed() { -return this.context.failed(); -} - -/** - * Returns the error encountered when processing the current stage. - * - * @return the error encountered when processing the current stage - */ -public synchronized Throwable error() { -return this.context.error(); +this.reporters = Objects.requireNonNull(reporters, "reporters"); Review Comment: While I like the CachedSupplier(Supplier supplier) signature, I think it is probably not a good idea to use it with AutoCloseables, as the ownership of the object is ambiguous from the signature. Currently I know that Supplier> puts the responsibility on the caller of get() to close the returned error reporters, but if CachedSupplier exists, some call-sites of get() will close the error reporters, and some won't. I guess the SharedTopicAdmin is the most similar thing I can think of. When it's a SharedTopicAdmin object, it needs to be closed. When it is cast to a Supplier, it doesn't need to be closed. Perhaps we could do something similar here. Perhaps the following (a little lighter weight than SharedTopicAdmin)? ``` public class AutoClosableSupplier implements AutoCloseable, Supplier { public AutoCloseableSupplier(Supplier s, String closeMessage) { ... } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16089) Kafka Streams still leaking memory
[ https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16089: --- Fix Version/s: 3.8.0 > Kafka Streams still leaking memory > -- > > Key: KAFKA-16089 > URL: https://issues.apache.org/jira/browse/KAFKA-16089 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Lucas Brutschy >Assignee: Nicholas Telford >Priority: Critical > Fix For: 3.8.0 > > Attachments: fix.png, graphviz (1).svg, unfix.png > > > In > [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2] > a leak was fixed in the release candidate for 3.7. > > However, Kafka Streams still seems to be leaking memory (just slower) after > the fix. > > Attached is the `jeprof` output right before a crash after ~11 hours. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16089) Kafka Streams still leaking memory
[ https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805801#comment-17805801 ] A. Sophie Blee-Goldman commented on KAFKA-16089: Yeah, nice investigation! That was a tricky one > Kafka Streams still leaking memory > -- > > Key: KAFKA-16089 > URL: https://issues.apache.org/jira/browse/KAFKA-16089 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Lucas Brutschy >Assignee: Nicholas Telford >Priority: Critical > Attachments: fix.png, graphviz (1).svg, unfix.png > > > In > [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2] > a leak was fixed in the release candidate for 3.7. > > However, Kafka Streams still seems to be leaking memory (just slower) after > the fix. > > Attached is the `jeprof` output right before a crash after ~11 hours. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
philipnee commented on PR #15000: URL: https://github.com/apache/kafka/pull/15000#issuecomment-1887951845 hey @lucasbru - i assume invoker queue here you meant by `OffsetCommitCallbackInvoker`. It is not shared with the background thread. There are two invokers used by the async consumer, one is the offset commit callback invoker, the other one is the rebalance callback. The rebalance callback is shared by the two threads but the commit callback invoker is not. my original thought was to use the background event to pass the interceptor onCommit event. i think we are all on the same page about using the invoker to invoke the events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449351768 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java: ## @@ -121,20 +121,22 @@ public static DeadLetterQueueReporter createAndSetup(Map adminPr /** * Write the raw records into a Kafka topic and return the producer future. * - * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. + * @param context processing context containing the raw record at {@link ProcessingContext#original()}. * @return the future associated with the writing of this record; never null */ -public Future report(ProcessingContext context) { +@SuppressWarnings("unchecked") +public Future report(ProcessingContext context) { if (dlqTopicName.isEmpty()) { return CompletableFuture.completedFuture(null); } errorHandlingMetrics.recordDeadLetterQueueProduceRequest(); -ConsumerRecord originalMessage = context.consumerRecord(); -if (originalMessage == null) { +if (!(context.original() instanceof ConsumerRecord)) { errorHandlingMetrics.recordDeadLetterQueueProduceFailed(); return CompletableFuture.completedFuture(null); } +ProcessingContext> sinkContext = (ProcessingContext>) context; Review Comment: Yes, but ConsumerRecord has no subclasses, and we never create a ProcessingContext for anything other than SourceRecord and ConsumerRecord. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449349792 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java: ## @@ -121,20 +121,22 @@ public static DeadLetterQueueReporter createAndSetup(Map adminPr /** * Write the raw records into a Kafka topic and return the producer future. * - * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. + * @param context processing context containing the raw record at {@link ProcessingContext#original()}. * @return the future associated with the writing of this record; never null */ -public Future report(ProcessingContext context) { +@SuppressWarnings("unchecked") +public Future report(ProcessingContext context) { if (dlqTopicName.isEmpty()) { return CompletableFuture.completedFuture(null); } errorHandlingMetrics.recordDeadLetterQueueProduceRequest(); -ConsumerRecord originalMessage = context.consumerRecord(); -if (originalMessage == null) { +if (!(context.original() instanceof ConsumerRecord)) { Review Comment: This is another generic-oddity that I fixed with a follow-up refactor that makes the DeadLetterQueueReporter#report accept a ProcessingContext> and eliminates all of these runtime checks which don't ever actually fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449347346 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ## @@ -236,7 +237,7 @@ protected V execAndHandleError(Operation operation, Class
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449326220 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ## @@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements AutoCloseable { private final ErrorHandlingMetrics errorHandlingMetrics; private final CountDownLatch stopRequestedLatch; private volatile boolean stopping; // indicates whether the operator has been asked to stop retrying - -protected final ProcessingContext context; +private List reporters; public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics) { -this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1)); +this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new CountDownLatch(1)); } RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics, - ProcessingContext context, CountDownLatch stopRequestedLatch) { + CountDownLatch stopRequestedLatch) { this.errorRetryTimeout = errorRetryTimeout; this.errorMaxDelayInMillis = errorMaxDelayInMillis; this.errorToleranceType = toleranceType; this.time = time; this.errorHandlingMetrics = errorHandlingMetrics; -this.context = context; this.stopRequestedLatch = stopRequestedLatch; this.stopping = false; +this.reporters = Collections.emptyList(); } -public synchronized Future executeFailed(Stage stage, Class executingClass, - ConsumerRecord consumerRecord, - Throwable error) { - +public Future executeFailed(ProcessingContext context, Stage stage, Class executingClass, Throwable error) { markAsFailed(); -context.consumerRecord(consumerRecord); context.currentContext(stage, executingClass); context.error(error); errorHandlingMetrics.recordFailure(); -Future errantRecordFuture = context.report(); +Future errantRecordFuture = report(context); if (!withinToleranceLimits()) { errorHandlingMetrics.recordError(); throw new ConnectException("Tolerance exceeded in error handler", error); } return errantRecordFuture; } -public synchronized Future executeFailed(Stage stage, Class executingClass, - SourceRecord sourceRecord, - Throwable error) { Review Comment: Yep, there were two nearly-identical implementations that differed only by the type of record they accepted. They differed in the ConnectException message, and when merging them I just kept the more generic message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449313479 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java: ## @@ -17,82 +17,36 @@ package org.apache.kafka.connect.runtime.errors; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture; import org.apache.kafka.connect.source.SourceRecord; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.stream.Collectors; - /** - * Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant - * to exist per task in a JVM. + * Contains all the metadata related to the currently evaluating operation, and associated with a particular + * sink or source record from the consumer or task, respectively. This class is not thread safe, and so once an + * instance is passed to a new thread, it should no longer be accessed by the previous thread. Review Comment: 1. I believe so, as long as the "passing between the threads" itself is thread safe. So for example, if one thread writes to context, writes the context to a volatile field, then a second thread reads from that volatile field and then reads from the context, the memory model should ensure that the read sees the writes. Producer::send appears to have the synchronization when allocating the write to a batch. And the InternalSinkRecord#context field is marked final, which according to [this guidance](https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#finalRight) regarding final fields (emphasis is mine) should be sufficient: > The values for an object's final fields are set in its constructor. Assuming the object is constructed "correctly", once an object is constructed, the values assigned to the final fields in the constructor will be visible to all other threads without synchronization. In addition, the visible values for any other object or array referenced by those final fields **will be at least as up-to-date as the final fields.** 2. The fields themselves are not synchronized, as the object is not thread-safe. I don't think volatile buys any thread safety in this situation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]
C0urante commented on PR #15180: URL: https://github.com/apache/kafka/pull/15180#issuecomment-1887808692 @gharris1727 @yashmayya @mimaison would any of you mind taking a look when you have a moment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805779#comment-17805779 ] Lianet Magrans commented on KAFKA-15561: Hey [~phuctran], this ticket is only for the client support of the regex, so only changes in the consumer side to make sure that we pass on the regex in the request after calls to #subscribe(SubscriptionPattern...). But regex are not supported by the server yet (that's why it's not included in the ConsumerGroupHeartbeatRequestData). Once the server supports it, we can take on this one to have the client use it. Thanks for your interest! Happy to answer any other question. > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15475: -- Priority: Critical (was: Minor) > Timeout request might retry forever even if the user API times out in > PrototypeAsyncConsumer > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
C0urante commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449271258 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ## @@ -143,7 +144,7 @@ public synchronized Future executeFailed(Stage stage, Class executingCl * @param return type of the result of the operation. * @return result of the operation */ -public synchronized V execute(Operation operation, Stage stage, Class executingClass) { +public V execute(ProcessingContext context, Operation operation, Stage stage, Class executingClass) { Review Comment: I know this isn't your fault but if you have time, could we add a `throws` clause to the Javadocs stating that an exception will be thrown if a non-tolerable error is encountered? I always get tripped up reading interactions with this class and a big part of it is trying to understand the conditions where exceptions are thrown, the context is marked as failed, or `null` is returned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15320) Document event queueing patterns
[ https://issues.apache.org/jira/browse/KAFKA-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15320: -- Priority: Minor (was: Major) > Document event queueing patterns > > > Key: KAFKA-15320 > URL: https://issues.apache.org/jira/browse/KAFKA-15320 > Project: Kafka > Issue Type: Task > Components: clients, consumer, documentation >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > We need to first document the event enqueuing patterns in the > PrototypeAsyncConsumer. As part of this task, determine if it’s > necessary/beneficial to _conditionally_ add events and/or coalesce any > duplicate events in the queue. > _Don’t forget to include diagrams for clarity!_ > This should be documented on the AK wiki. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID
[ https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15283: -- Priority: Minor (was: Major) > Client support for OffsetFetch and OffsetCommit with topic ID > - > > Key: KAFKA-15283 > URL: https://issues.apache.org/jira/browse/KAFKA-15283 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Minor > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory > {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and > {{METADATA}} RPC calls. > With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in > the same way, so the new client implementation will provide it when issuing > those requests. Topic names should continue to be supported as needed by the > {{{}AdminClient{}}}. > We should also review/clean-up the support for topic names in requests such > as the {{METADATA}} request (currently supporting topic names as well as > topic IDs on the client side). > Tasks include: > * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will > be upgraded on the server to support topic ID > * Check topic ID propagation internally in the client based on RPCs > including it. > * Review existing support for topic name for potential clean if not needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15173) Consumer event queues should be bounded
[ https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15173: -- Priority: Minor (was: Major) > Consumer event queues should be bounded > --- > > Key: KAFKA-15173 > URL: https://issues.apache.org/jira/browse/KAFKA-15173 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The async consumer uses ApplicationEventQueue and BackgroundEventQueue to > facilitate message passing between the application thread and the background > thread. The current implementation is boundless, which can potentially cause > OOM and other performance-related issues. > I think the queues need a finite bound, and we need to decide how to handle > the situation when the bound is reached. In particular, I would like to > answer these questions: > > # What should the upper limit be for both queues: Can this be a > configurable, memory-based bound? Or just an arbitrary number of events as > the bound. > # What should happen when the application event queue is filled up? It > seems like we should introduce a new exception type and notify the user that > the consumer is full. > # What should happen when the background event queue is filled up? This > seems less likely to happen, but I imagine it could happen when the user > stops polling the consumer, causing the queue to be filled. > # Is it necessary to introduce a public configuration for the queue? I think > initially we would select an arbitrary constant number and see the community > feedback to make a forward plan accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16104: - Assignee: Kirk True > Enable additional PlaintextConsumerTest tests for new consumer > -- > > Key: KAFKA-16104 > URL: https://issues.apache.org/jira/browse/KAFKA-16104 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > It should be possible to enable: > * testAutoCommitOnClose > * testAutoCommitOnCloseAfterWakeup > * testExpandingTopicSubscriptions > * testShrinkingTopicSubscriptions > * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed) > * testAutoCommitOnRebalance > * testPerPartitionLeadMetricsCleanUpWithSubscribe > * testPerPartitionLagMetricsCleanUpWithSubscribe > * testStaticConsumerDetectsNewPartitionCreatedAfterRestart -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssigned completes
[ https://issues.apache.org/jira/browse/KAFKA-16107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16107: - Assignee: Lianet Magrans > Ensure consumer does not start fetching from added partitions until > onPartitionsAssigned completes > -- > > Key: KAFKA-16107 > URL: https://issues.apache.org/jira/browse/KAFKA-16107 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > In the new consumer implementation, when new partitions are assigned, the > subscription state is updated and then the #onPartitionsAssigned triggered. > This sequence seems sensible but we need to ensure that no data is fetched > until the onPartitionsAssigned completes (where the user could be setting the > committed offsets it want to start fetching from). > We should pause the partitions newly added partitions until > onPartitionsAssigned completes, similar to how it's done on revocation to > avoid positions getting ahead of the committed offsets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
[ https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16010: - Assignee: Kirk True > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling > -- > > Key: KAFKA-16010 > URL: https://issues.apache.org/jira/browse/KAFKA-16010 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848 > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3] after one consumer left > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
[ https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16009: - Assignee: Kirk True > Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation > > > Key: KAFKA-16009 > URL: https://issues.apache.org/jira/browse/KAFKA-16009 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848 > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235) > {code} > The logs include this line: > > {code} > [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16008: - Assignee: Kirk True > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848 > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding
[ https://issues.apache.org/jira/browse/KAFKA-16023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16023: - Assignee: Kirk True > PlaintextConsumerTest needs to wait for reconciliation to complete before > proceeding > > > Key: KAFKA-16023 > URL: https://issues.apache.org/jira/browse/KAFKA-16023 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Several tests in PlaintextConsumerTest.scala (such as > testPerPartitionLagMetricsCleanUpWithSubscribe) uses: > assertEquals(1, listener.callsToAssigned, "should be assigned once") > However, as the timing for reconciliation completion is not deterministic due > to asynchronous processing. We actually need to wait until the condition to > happen. > However, another issue is the timeout - some of these tasks might not > complete within the 600ms timeout, so the tests are deemed to be flaky. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16100: - Assignee: Kirk True > Consistent handling of timeouts and responses for new consumer > ApplicationEvents > > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invoke and verify the consumer callback
[ https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16019: - Assignee: Kirk True > Some of the tests in PlaintextConsumer can't seem to deterministically invoke > and verify the consumer callback > -- > > Key: KAFKA-16019 > URL: https://issues.apache.org/jira/browse/KAFKA-16019 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > I was running the PlaintextConsumer to test the async consumer; however, a > few tests were failing with not being able to verify the listener is invoked > correctly > For example `testPerPartitionLeadMetricsCleanUpWithSubscribe` > Around 50% of the time, the listener's callsToAssigned was never incremented > correctly. Event changing it to awaitUntilTrue it was still the same case > {code:java} > consumer.subscribe(List(topic, topic2).asJava, listener) > val records = awaitNonEmptyRecords(consumer, tp) > assertEquals(1, listener.callsToAssigned, "should be assigned once") {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16004) Review new consumer inflight offset commit logic
[ https://issues.apache.org/jira/browse/KAFKA-16004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16004: - Assignee: Lianet Magrans > Review new consumer inflight offset commit logic > > > Key: KAFKA-16004 > URL: https://issues.apache.org/jira/browse/KAFKA-16004 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > New consumer logic for committing offsets handles inflight requests, to > validate that no commit requests are sent if a previous one hasn't received a > response. Review how that logic is currently applied to both, sync and async > commits and validate against the legacy coordinator, who seems to apply it > only for async commits. Review considering behaviour for auto-commits too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16001: - Assignee: Kirk True > Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder > --- > > Key: KAFKA-16001 > URL: https://issues.apache.org/jira/browse/KAFKA-16001 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15993) Enable max poll integration tests that depend on callback invocation
[ https://issues.apache.org/jira/browse/KAFKA-15993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15993: - Assignee: Kirk True > Enable max poll integration tests that depend on callback invocation > > > Key: KAFKA-15993 > URL: https://issues.apache.org/jira/browse/KAFKA-15993 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > We will enable integration tests using the async consumer in KAFKA-15971. > However, we should also enable tests that rely on rebalance listeners after > KAFKA-15628 is closed. One example would be testMaxPollIntervalMs, that I > relies on the listener to verify the correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15999) Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-15999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15999: - Assignee: Kirk True > Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder > - > > Key: KAFKA-15999 > URL: https://issues.apache.org/jira/browse/KAFKA-15999 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]
C0urante commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449180005 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ## @@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements AutoCloseable { private final ErrorHandlingMetrics errorHandlingMetrics; private final CountDownLatch stopRequestedLatch; private volatile boolean stopping; // indicates whether the operator has been asked to stop retrying - -protected final ProcessingContext context; +private List reporters; public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics) { -this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1)); +this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new CountDownLatch(1)); } RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics, - ProcessingContext context, CountDownLatch stopRequestedLatch) { + CountDownLatch stopRequestedLatch) { this.errorRetryTimeout = errorRetryTimeout; this.errorMaxDelayInMillis = errorMaxDelayInMillis; this.errorToleranceType = toleranceType; this.time = time; this.errorHandlingMetrics = errorHandlingMetrics; -this.context = context; this.stopRequestedLatch = stopRequestedLatch; this.stopping = false; +this.reporters = Collections.emptyList(); } -public synchronized Future executeFailed(Stage stage, Class executingClass, - ConsumerRecord consumerRecord, - Throwable error) { - +public Future executeFailed(ProcessingContext context, Stage stage, Class executingClass, Throwable error) { markAsFailed(); -context.consumerRecord(consumerRecord); context.currentContext(stage, executingClass); context.error(error); errorHandlingMetrics.recordFailure(); -Future errantRecordFuture = context.report(); +Future errantRecordFuture = report(context); if (!withinToleranceLimits()) { errorHandlingMetrics.recordError(); throw new ConnectException("Tolerance exceeded in error handler", error); } return errantRecordFuture; } -public synchronized Future executeFailed(Stage stage, Class executingClass, - SourceRecord sourceRecord, - Throwable error) { - -markAsFailed(); -context.sourceRecord(sourceRecord); -context.currentContext(stage, executingClass); -context.error(error); -errorHandlingMetrics.recordFailure(); -Future errantRecordFuture = context.report(); -if (!withinToleranceLimits()) { -errorHandlingMetrics.recordError(); -throw new ConnectException("Tolerance exceeded in Source Worker error handler", error); +/** + * Report errors. Should be called only if an error was encountered while executing the operation. + * + * @return a errant record future that potentially aggregates the producer futures Review Comment: Nit (I know this is just moved as-is from the `ProcessingContext` class but we might as well fix it up while we're in the neighborhood): ```suggestion * @return an errant record future that potentially aggregates the producer futures ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java: ## @@ -17,82 +17,36 @@ package org.apache.kafka.connect.runtime.errors; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture; import org.apache.kafka.connect.source.SourceRecord; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.stream.Collectors; - /** - * Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant - * to exist per task in a JVM. + * Contains all the metadata related to the currently e
[jira] [Assigned] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions
[ https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15843: - Assignee: Lianet Magrans > Review consumer onPartitionsAssigned called with empty partitions > - > > Key: KAFKA-15843 > URL: https://issues.apache.org/jira/browse/KAFKA-15843 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.8.0 > > > Legacy coordinator triggers onPartitionsAssigned with empty assignment (which > is not the case when triggering onPartitionsRevoked or Lost). This is the > behaviour of the legacy coordinator, and the new consumer implementation > maintains the same principle. We should review this to fully understand if it > is really needed to call onPartitionsAssigned with empty assignment (or if it > should behave consistently with the onRevoke/Lost) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15847) Consider partial metadata requests for client reconciliation
[ https://issues.apache.org/jira/browse/KAFKA-15847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15847: - Assignee: Lianet Magrans > Consider partial metadata requests for client reconciliation > > > Key: KAFKA-15847 > URL: https://issues.apache.org/jira/browse/KAFKA-15847 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > New consumer implementing KIP-848 protocol needs to resolve metadata for the > topics received in the assignment. It does so by relying on the centralized > metadata object. Currently metadata updates requested through the metadata > object, request metadata for all topics. Consider allowing the partial > updates that are already expressed as an intention in the Metadata class but > not fully supported (investigate background in case there were some specifics > that led to this intention not being fully implemented) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15652) Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()
[ https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15652: - Assignee: Philip Nee > Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp() > > > Key: KAFKA-15652 > URL: https://issues.apache.org/jira/browse/KAFKA-15652 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > In the {{updateFetchPositions()}} method implementation, both > {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions > asynchronously. [~junrao] stated the following in a [recent PR > review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]: > {quote}There is a subtle difference between transitioning to reset from > initializing and transitioning to reset from {{OffsetOutOfRangeException}} > during fetch. In the latter, the application thread will call > {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default > offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the > application thread during {{{}poll{}}}, which is what we want. > However, for the former, if there is no default offset reset policy, we > simply ignore that partition through > {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, > the partition will be forever in the reset state and the application thread > won't get the {{{}OffsetOutOfRangeException{}}}. > {quote} > I intentionally changed the code so that no exceptions were thrown in > {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an > empty map. When I ran the unit tests and integration tests, there were no > failures, strongly suggesting that there is no coverage of this particular > edge case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15691) Add new system tests to use new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15691: - Assignee: Kirk True > Add new system tests to use new consumer > > > Key: KAFKA-15691 > URL: https://issues.apache.org/jira/browse/KAFKA-15691 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15638: - Assignee: Kirk True (was: Philip Nee) > Investigate ConsumerNetworkThreadTest's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls
[ https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15551: - Assignee: Lianet Magrans (was: Philip Nee) > Evaluate conditions for short circuiting consumer API calls > --- > > Key: KAFKA-15551 > URL: https://issues.apache.org/jira/browse/KAFKA-15551 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > For conditions like: > * Committing empty offset > * Fetching offsets for empty partitions > * Getting empty topic partition position > Should be short circuit possibly at the API level. > As a bonus, we should double-check whether the existing {{KafkaConsumer}} > implementation suffers from this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored
[ https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15639: - Assignee: Kirk True (was: Philip Nee) > Investigate ConsumerNetworkThreadTest's > testResetPositionsProcessFailureIsIgnored > - > > Key: KAFKA-15639 > URL: https://issues.apache.org/jira/browse/KAFKA-15639 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The {{testResetPositionsProcessFailureIsIgnored}} test looks like this: > > {code:java} > @Test > public void testResetPositionsProcessFailureIsIgnored() { > doThrow(new > NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); > ResetPositionsApplicationEvent event = new > ResetPositionsApplicationEvent(); > applicationEventsQueue.add(event); > assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); > > verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); > } > {code} > > [~junrao] asks: > > {quote}Not sure if this is a useful test since > {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly > throw an exception? > {quote} > > I commented out the {{doThrow}} line and it did not impact the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1449270020 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: That’s fair. Let me add it tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1449268767 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: I would think it is just codifying the state of the protocol and will flag us if it changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15475: - Assignee: Kirk True (was: Philip Nee) > Timeout request might retry forever even if the user API times out in > PrototypeAsyncConsumer > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1449266480 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: I could add it but it does not bring much value in my opinion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16110) Implement consumer performance tests
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16110: - Assignee: Philip Nee (was: Kirk True) > Implement consumer performance tests > > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID
[ https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15283: - Assignee: Lianet Magrans > Client support for OffsetFetch and OffsetCommit with topic ID > - > > Key: KAFKA-15283 > URL: https://issues.apache.org/jira/browse/KAFKA-15283 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory > {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and > {{METADATA}} RPC calls. > With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in > the same way, so the new client implementation will provide it when issuing > those requests. Topic names should continue to be supported as needed by the > {{{}AdminClient{}}}. > We should also review/clean-up the support for topic names in requests such > as the {{METADATA}} request (currently supporting topic names as well as > topic IDs on the client side). > Tasks include: > * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will > be upgraded on the server to support topic ID > * Check topic ID propagation internally in the client based on RPCs > including it. > * Review existing support for topic name for potential clean if not needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15305: - Assignee: Kirk True (was: Philip Nee) > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. > > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
AndrewJSchofield commented on PR #15000: URL: https://github.com/apache/kafka/pull/15000#issuecomment-1887735105 I agree with @lianetm that the second option seems best. The invoker mechanism already exists for `commitAsync()` and submitting to the invoker is already using a thread-safe object. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
kamalcph commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1449241531 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -473,6 +498,11 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf brokerTopicAggregatedMetric.setPartitionMetricValue(partition, segmentsLag) } + def recordRemoteDeleteLagBytes(topic: String, segmentsLag: Long): Unit = { Review Comment: Can we rename `segmentsLag` to `bytesLag`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
[ https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16113: -- Fix Version/s: 3.8.0 > AsyncKafkaConsumer: Add missing offset commit metrics > - > > Key: KAFKA-16113 > URL: https://issues.apache.org/jira/browse/KAFKA-16113 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The following metrics are missing from the AsyncKafkaConsumer: > commit-latency-avg > commit-latency-max > commit-rate > commit-total > committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
[ https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16113: -- Labels: consumer-threading-refactor (was: ) > AsyncKafkaConsumer: Add missing offset commit metrics > - > > Key: KAFKA-16113 > URL: https://issues.apache.org/jira/browse/KAFKA-16113 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > The following metrics are missing from the AsyncKafkaConsumer: > commit-latency-avg > commit-latency-max > commit-rate > commit-total > committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
[ https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16113: -- Component/s: clients metrics > AsyncKafkaConsumer: Add missing offset commit metrics > - > > Key: KAFKA-16113 > URL: https://issues.apache.org/jira/browse/KAFKA-16113 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > The following metrics are missing from the AsyncKafkaConsumer: > commit-latency-avg > commit-latency-max > commit-rate > commit-total > committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1449238660 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: Oh interesting. Is it worth testing that we don't return it for now and update it if we plan to change the behavior? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios
[ https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16111: -- Summary: Implement tests for tricky rebalance callback scenarios (was: Implement tests for tricky rebalance callbacks scenarios) > Implement tests for tricky rebalance callback scenarios > --- > > Key: KAFKA-16111 > URL: https://issues.apache.org/jira/browse/KAFKA-16111 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15876: Introduce RemoteStorageNotReadyException retryable error [kafka]
kamalcph commented on code in PR #14822: URL: https://github.com/apache/kafka/pull/14822#discussion_r1449228856 ## clients/src/main/java/org/apache/kafka/common/errors/RemoteStorageNotReadyException.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * An exception that indicates remote storage is not ready to receive the requests yet. Review Comment: If there are no alternatives, we may have to consider dropping the KIP or patch. This is something I should have brought up during the KIP discussion. I overlooked the implications on the client code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invoke and verify the consumer callback
[ https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16019: -- Summary: Some of the tests in PlaintextConsumer can't seem to deterministically invoke and verify the consumer callback (was: Some of the tests in PlaintextConsumer can't seem to deterministically invokes and verify the consumer callback) > Some of the tests in PlaintextConsumer can't seem to deterministically invoke > and verify the consumer callback > -- > > Key: KAFKA-16019 > URL: https://issues.apache.org/jira/browse/KAFKA-16019 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > I was running the PlaintextConsumer to test the async consumer; however, a > few tests were failing with not being able to verify the listener is invoked > correctly > For example `testPerPartitionLeadMetricsCleanUpWithSubscribe` > Around 50% of the time, the listener's callsToAssigned was never incremented > correctly. Event changing it to awaitUntilTrue it was still the same case > {code:java} > consumer.subscribe(List(topic, topic2).asJava, listener) > val records = awaitNonEmptyRecords(consumer, tp) > assertEquals(1, listener.callsToAssigned, "should be assigned once") {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15876: Introduce RemoteStorageNotReadyException retryable error [kafka]
kamalcph commented on code in PR #14822: URL: https://github.com/apache/kafka/pull/14822#discussion_r1449218105 ## clients/src/main/java/org/apache/kafka/common/errors/RemoteStorageNotReadyException.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * An exception that indicates remote storage is not ready to receive the requests yet. Review Comment: Updated the Javadoc. We cannot introduce a new error-code as the clients will throw `IllegalStateException` back to the caller when it encounters any new error-code. See [FetchCollector#handleInitializeErrors](https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java?L309). What options do we have? With the existing `REPLICA_NOT_AVAILABLE` error, the client will request a metadata update which we want to avoid. It seems there is no other option left for us except `UNKNOWN_SERVER_ERROR` which is generic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lianetm commented on PR #15000: URL: https://github.com/apache/kafka/pull/15000#issuecomment-1887680132 My take would be option 2, using the invoker in the background thread and submitting a task for the interceptor. Seems like a clean way, re-using the mechanism of the invoker already in place, and without doing any major refactoring. I would definitely leave the auto-commit logic in the background thread where it is, as it's truly an internal operation/request we want to perform without any direct relation with the app layer, needed from multiple places in the background even: auto-commit on the interval, but also auto-commit as part of the reconciliation process. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Migrate #testStartPaused to Mockito [kafka]
hgeraldino commented on PR #14663: URL: https://github.com/apache/kafka/pull/14663#issuecomment-1887674876 > Hi @hgeraldino Thanks for taking on the migration! > > I understand the idea behind your refactor-then-deduplicate strategy, but I think the excessive duplication is making it difficult (at least for me) to review the change. > > What do you think about starting a new test class, and moving the migrated tests into that new class? This would allow you to use synonymous variable names, annotation mocks, and method names? At the end we can delete the original class and move the new class back to the original class name. > > This will separate the added and removed parts in the diff, where they are currently inline. But the mocking libraries are so substantially different that the inline parts of the diff are not very helpful anyway. Sure thing @gharris1727, if you think this will make the review process easier I'm happy to do that. I recon these reviews are a bit painful - especially for this test class that contains dozens of tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15147) Measure pending and outstanding Remote Segment operations
[ https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805734#comment-17805734 ] Christo Lolov commented on KAFKA-15147: --- Heya [~fvisconte], a couple of days ago [~showuon] also noticed discrepancies and hopefully they should be addressed as part of https://github.com/apache/kafka/pull/15133! > Measure pending and outstanding Remote Segment operations > - > > Key: KAFKA-15147 > URL: https://issues.apache.org/jira/browse/KAFKA-15147 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Upload+and+delete+lag+metrics+in+Tiered+Storage > > KAFKA-15833: RemoteCopyLagBytes > KAFKA-16002: RemoteCopyLagSegments, RemoteDeleteLagBytes, > RemoteDeleteLagSegments > KAFKA-16013: ExpiresPerSec > KAFKA-16014: RemoteLogSizeComputationTime, RemoteLogSizeBytes, > RemoteLogMetadataCount > KAFKA-15158: RemoteDeleteRequestsPerSec, RemoteDeleteErrorsPerSec, > BuildRemoteLogAuxStateRequestsPerSec, BuildRemoteLogAuxStateErrorsPerSec > > Remote Log Segment operations (copy/delete) are executed by the Remote > Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default > TopicBasedRLMM writes to the internal Kafka topic state changes on remote log > segments). > As executions run, fail, and retry; it will be important to know how many > operations are pending and outstanding over time to alert operators. > Pending operations are not enough to alert, as values can oscillate closer to > zero. An additional condition needs to apply (running time > threshold) to > consider an operation outstanding. > Proposal: > RemoteLogManager could be extended with 2 concurrent maps > (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure > segmentId time when operation started, and based on this expose 2 metrics per > operation: > * pendingSegmentCopies: gauge of pendingSegmentCopies map > * outstandingSegmentCopies: loop over pending ops, and if now - startedTime > > timeout, then outstanding++ (maybe on debug level?) > Is this a valuable metric to add to Tiered Storage? or better to solve on a > custom RLMM implementation? > Also, does it require a KIP? > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
dajac commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1449170060 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +List statesFilter, +List typesFilter, +long committedOffset +) { +Predicate combinedFilter = group -> { +boolean stateCheck = statesFilter.isEmpty() || statesFilter.contains(group.stateAsString(committedOffset)); +boolean typeCheck = typesFilter.isEmpty() || typesFilter.contains(group.type().toString()); Review Comment: as discussed offline, we cannot rely on the client side to do the right thing. we should rather lower case them when on he server side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16089: Fix memory leak in RocksDBStore [kafka]
lucasbru merged PR #15174: URL: https://github.com/apache/kafka/pull/15174 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15351: Ensure log-start-offset not updated to local-log-start-offset when remote storage enabled [kafka]
kamalcph commented on code in PR #14301: URL: https://github.com/apache/kafka/pull/14301#discussion_r1449147426 ## core/src/main/scala/kafka/log/LogLoader.scala: ## @@ -78,7 +78,8 @@ class LogLoader( recoveryPointCheckpoint: Long, leaderEpochCache: Option[LeaderEpochFileCache], producerStateManager: ProducerStateManager, - numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int] + numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int], + isRemoteLogEnabled: Boolean = false, Review Comment: Opened a PR to address it: https://github.com/apache/kafka/pull/15179. PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add isRemoteLogEnabled parameter to the Log Loader Javadoc. [kafka]
kamalcph opened a new pull request, #15179: URL: https://github.com/apache/kafka/pull/15179 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org