Re: [PR] KAFKA-15876: Introduce RemoteStorageNotReadyException retryable error [kafka]

2024-01-11 Thread via GitHub


showuon commented on code in PR #14822:
URL: https://github.com/apache/kafka/pull/14822#discussion_r1449931227


##
clients/src/main/java/org/apache/kafka/common/errors/RemoteStorageNotReadyException.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This retryable exception indicates that remote storage is not ready to 
receive the requests yet.

Review Comment:
   nit: Below we extends `RetriableException`, so maybe this should spell as 
`retriable`?



##
clients/src/main/java/org/apache/kafka/common/errors/RemoteStorageNotReadyException.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This retryable exception indicates that remote storage is not ready to 
receive the requests yet.

Review Comment:
   Should we mention this will only be used when using 
`TopicBasedRemoteLogMetadataManager`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-11 Thread via GitHub


artemlivshits commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1449897783


##
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+MetadataCache metadataCache;
+AuthHelper authHelper;
+KafkaConfig config;
+
+public DescribeTopicPartitionsRequestHandler(
+MetadataCache metadataCache,
+AuthHelper authHelper,
+KafkaConfig config
+) {
+this.metadataCache = metadataCache;
+this.authHelper = authHelper;
+this.config = config;
+}
+
+public DescribeTopicPartitionsResponseData 
handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+if (metadataCache instanceof ZkMetadataCache) {
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request");
+}
+KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache) 
metadataCache;
+
+DescribeTopicPartitionsRequestData request = 
((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+Set topics = new HashSet<>();
+boolean fetchAllTopics = request.topics().isEmpty();
+DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+if (fetchAllTopics) {
+if (cursor != null) {
+// Includes the cursor topic in case the cursor topic does not 
exist anymore.
+topics.add(cursor.topicName());

Review Comment:
   Instead of doing this, can we get all topics that are >= cursor and then 
check (after sorting) if the cursor topic is present and reset partition index 
to 0 if it's not?  This would also handle the unauthorized case.



##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The 
NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this 
topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in 
the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topicsThe set of topics and their 
corresponding first partition id to fetch.
+   * @param listenerName  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param maximumNumberOfPartitions The max number of partitions to 
return.
+   */
+  def

Re: [PR] MINOR: Add test case for follower fetch [kafka]

2024-01-11 Thread via GitHub


showuon commented on PR #14212:
URL: https://github.com/apache/kafka/pull/14212#issuecomment-1888514023

   @divijvaidya , do you want to have another look at this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [KAFKA-15749] Adding support for Kraft in test testClusterIdPresent [kafka]

2024-01-11 Thread via GitHub


chirag-wadhwa5 opened a new pull request, #15181:
URL: https://github.com/apache/kafka/pull/15181

   Adding the KRaft test for testClusterIdPresent() in 
KafkaMetricReporterClusterIdTest class
   Ref: 
[KAFKA-15749](https://issues.apache.org/jira/browse/KAFKA-15749?jql=labels%20%3D%20kraft-test)
   
   Testing:
   test passed successfully. Adding a screenshot below
   
   https://github.com/apache/kafka/assets/122860692/ce2e6b23-c9ff-456b-a968-4ca7477b77e2";>
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-01-11 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805883#comment-17805883
 ] 

Phuc Hong Tran commented on KAFKA-15561:


I figured those out, so they were auto-gerenated classes.

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-01-11 Thread Phuc Hong Tran (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-15561 ]


Phuc Hong Tran deleted comment on KAFKA-15561:


was (Author: JIRAUSER301295):
[~lianetm], look like the problem is on my local machine.

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-01-11 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805880#comment-17805880
 ] 

Phuc Hong Tran commented on KAFKA-15561:


[~lianetm], look like the problem is on my local machine.

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-11 Thread via GitHub


showuon commented on PR #15133:
URL: https://github.com/apache/kafka/pull/15133#issuecomment-1888394290

   @satishd , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add isRemoteLogEnabled parameter to the Log Loader Javadoc [kafka]

2024-01-11 Thread via GitHub


showuon commented on PR #15179:
URL: https://github.com/apache/kafka/pull/15179#issuecomment-1888393499

   This is just a javadoc update. I'll merge it after the CI build completes if 
no other comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable kraft test in kafka.api [kafka]

2024-01-11 Thread via GitHub


dengziming merged PR #14595:
URL: https://github.com/apache/kafka/pull/14595


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimization of equals methods on implementations of Commands.Handler in shell.command package [kafka]

2024-01-11 Thread via GitHub


github-actions[bot] commented on PR #14018:
URL: https://github.com/apache/kafka/pull/14018#issuecomment-1888380284

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test case for follower fetch [kafka]

2024-01-11 Thread via GitHub


github-actions[bot] commented on PR #14212:
URL: https://github.com/apache/kafka/pull/14212#issuecomment-1888380213

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]

2024-01-11 Thread via GitHub


github-actions[bot] commented on PR #14491:
URL: https://github.com/apache/kafka/pull/14491#issuecomment-1888380144

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16095) Update list group state type filter to include the states for the new consumer group type

2024-01-11 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805868#comment-17805868
 ] 

Lan Ding commented on KAFKA-16095:
--

[~rreddy22] I think the flag "–type" should be added to allow filtering by 
group type in the ConsumerGroupCommand tool, perhaps we should wait for 
[KAFKA-15460] to finish first.

> Update list group state type filter to include the states for the new 
> consumer group type
> -
>
> Key: KAFKA-16095
> URL: https://issues.apache.org/jira/browse/KAFKA-16095
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: Lan Ding
>Priority: Minor
>
> # While using *—list —state* the current accepted values correspond to the 
> classic group type states. We need to include support for the new group type 
> states.
>  ## Consumer Group: Should list the state of the group. Accepted Values: 
>  ### _UNKNOWN(“unknown”)_
>  ### {_}EMPTY{_}("empty"),
>  ### *{_}ASSIGNING{_}("assigning"),*
>  ### *{_}RECONCILING{_}("reconciling"),*
>  ### {_}STABLE{_}("stable"),
>  ### {_}DEAD{_}("dead");
>  # 
>  ## Classic Group : Should list the state of the group. Accepted Values: 
>  ### {_}UNKNOWN{_}("Unknown"),
>  ### {_}EMPTY{_}("Empty");
>  ### *{_}PREPARING_REBALANCE{_}("PreparingRebalance"),*
>  ### *{_}COMPLETING_REBALANCE{_}("CompletingRebalance"),*
>  ### {_}STABLE{_}("Stable"),
>  ### {_}DEAD{_}("Dead")



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Log a warning when connectors generate greater than tasks.max task configs [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on PR #14694:
URL: https://github.com/apache/kafka/pull/14694#issuecomment-1888248086

   @C0urante Is this superseded by #15180 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8115) Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated

2024-01-11 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805849#comment-17805849
 ] 

Greg Harris commented on KAFKA-8115:


We doubled the timeout for this test in KAFKA-15760 after looking at some 
cpu-restricted profiles of the test showed the timeout could be exceeded by 
just loading classes for the REST server.

I'll keep monitoring this to see if the flakiness changes.

> Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated
> ---
>
> Key: KAFKA-8115
> URL: https://issues.apache.org/jira/browse/KAFKA-8115
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Greg Harris
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3254/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/testTaskRequestWithOldStartMsGetsUpdated/]
> {quote}org.junit.runners.model.TestTimedOutException: test timed out after 
> 12 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native 
> Method) at 
> java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
>  at 
> java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
>  at 
> java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
>  at 
> java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
>  at 
> app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:157)
>  at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) 
> at 
> app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285)
>  at 
> app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596)
>  at 
> java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@11.0.1/java.lang.reflect.Method.invoke(Method.java:566) at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>  at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>  at java.base@11.0.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
> at java.base@11.0.1/java.lang.Thread.run(Thread.java:834){quote}
> STDOUT
> {quote}[2019-03-15 09:23:41,364] INFO Creating MiniTrogdorCluster with 
> agents: node02 and coordinator: node01 
> (org.apache.kafka.trogdor.common.MiniTrogdorCluster:135) [2019-03-15 
> 09:23:41,595] INFO Logging initialized @13340ms to 
> org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193) 
> [2019-03-15 09:23:41,752] INFO Starting REST server 
> (org.apache.kafka.trogdor.rest.JsonRestServer:89) [2019-03-15 09:23:41,912] 
> INFO Registered resource 
> org.apache.kafka.trogdor.agent.AgentRestResource@3fa38ceb 
> (org.apache.kafka.trogdor.rest.JsonRestServer:94) [2019-03-15 09:23:42,178] 
> INFO jetty-9.4.14.v20181114; built: 2018-11-14T21:20:31.478Z; git: 
> c4550056e785fb5665914545889f21dc136ad9e6; jvm 11.0.1+13-LTS 
> (org.eclipse.jetty.server.Server:370) [2019-03-15 09:23:42,360] INFO 
> DefaultSessionIdManager workerName=node0 
> (org.eclipse.jetty.server.session:365) [2019-03-15 09:23:42,362] INFO No 
> SessionScavenger set, using defaults (org.eclipse.jetty.server.session:370) 
> [2019-03-15 09:23:42,370] INFO node0 Scavenging every 66ms 
> (org.eclipse.jetty.server.session:149) [2019-03-15 09:23:44,412] INFO Started 
> o.e.j.s.ServletContextHandler@335a5293\{/,null,AVAILABLE} 
> (org.eclipse.jetty.server.handler.ContextHandler:855) [2019-03-15 
> 09:23:44,473] INFO Started 
> ServerConnector@79a93bf1\{HTTP/1.1,[http/1.1]}{0.0.0.0:33477} 
> (org.eclipse.jetty.server.AbstractConnector:292) [2019-03-15 09:23:44,474] 
> INFO Started @16219ms (org.eclipse.jetty.server.Server:407) [2019-03-15 
> 09:23:44,475] INFO REST server 

[jira] [Resolved] (KAFKA-15760) org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated is flaky

2024-01-11 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-15760.
-
Fix Version/s: 3.8.0
 Assignee: David Mao
   Resolution: Fixed

> org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated
>  is flaky
> --
>
> Key: KAFKA-15760
> URL: https://issues.apache.org/jira/browse/KAFKA-15760
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Calvin Liu
>Assignee: David Mao
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.8.0
>
>
> Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
> org.apache.kafka.trogdor.coordinator.CoordinatorTest
> {code:java}
> java.util.concurrent.TimeoutException: 
> testTaskRequestWithOldStartMsGetsUpdated() timed out after 12 
> milliseconds at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
> at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
> at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]

2024-01-11 Thread via GitHub


gharris1727 merged PR #14917:
URL: https://github.com/apache/kafka/pull/14917


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on PR #14917:
URL: https://github.com/apache/kafka/pull/14917#issuecomment-1888236729

   Test failures appear unrelated, the test which is being changed has no 
failures, and this is just a timeout increase. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on PR #15101:
URL: https://github.com/apache/kafka/pull/15101#issuecomment-1888225488

   > One reason that some stream tests are failing because they use the same 
server for the duration of the entire test suite and don't create a server per 
test, for example MetricsIntegrationTest.shouldAddMetricsOnAllLevels().
   > Should we change our logic to test for these leaked threads at the end of 
every test class, instead of at the end of every test?
   
   @divijvaidya The stateful strategy of diffing the threads before and after 
each test should exclude the test-suite threads which are created before the 
test method begins. What is probably happening here is that there are some 
threads being lazily created as a side-effect of the actions of the test but 
which are not cleaned up immediately, but are later cleaned up by the whole 
cluster shutting down.
   
   I wouldn't recommend immediately reducing the scope to the Class-only 
enforcement, as it makes it so much harder to blame specific tests which 
contain leaks. It's so much more helpful to get a "This test leaked a thread" 
warning than a "this suite leaked a test" warning, especially when the 
assertion doesn't tell you where the thread is being allocated.
   
   As far as what to do next:
   
   1. You could figure out how to disable the lazy threads with some 
development-only configuration 
   2. You could figure out how to clean up these lazy threads, either 
automatically or with a development-only hint method.
   3. You could give these particular threads special privileges (by adding to 
the expected thread names)
   4. You could give these particular tests special privileges (with an 
`Ignore` annotation)
   
   These are technically leaks, and the extension is correctly finding them. 
It's up to us to figure out if they're worth fixing or if they're generally 
harmless.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1449550586


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -620,6 +650,11 @@ private boolean startTask(
 
 try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
 final ConnectorConfig connConfig = new 
ConnectorConfig(plugins, connProps);
+
+int maxTasks = 
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);

Review Comment:
   nit: make a `tasksMax` getter?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -391,7 +391,15 @@ public List> 
connectorTaskConfigs(String connName, Connector
 Connector connector = workerConnector.connector();

Review Comment:
   I didn't realize that this was the only place where Connector escapes the 
WorkerConnector, and a different thread interacts with the Connector object.
   
   I know the taskConfigs method is typically an instantaneous method, but 
maybe it would make sense for this to eventually move to the WorkerConnector 
thread instead of the herder tick thread.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -131,9 +133,26 @@ public void run() {
 }
 }
 
+/**
+ * Fail the connector.
+ * @param cause the cause of the failure; if null, the connector will not 
be failed
+ */
+public void fail(Throwable cause) {
+synchronized (this) {
+if (this.externalFailure != null)
+return;
+this.externalFailure = cause;
+notify();
+}
+}
+
 void doRun() {
 initialize();
 while (!stopping) {
+Throwable failure = externalFailure;
+if (failure != null)
+onFailure(failure);

Review Comment:
   Is this in danger of being called more than once, particularly if the 
connector has this problem and then a pause/resume request comes in? Is that a 
bad thing?
   
   It looks like the connector thread just waits in this loop until something 
external calls shutdown(), so I would expect this to get called whenever 
someone notify()'s the worker connector thread. 



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -131,9 +133,26 @@ public void run() {
 }
 }
 
+/**
+ * Fail the connector.
+ * @param cause the cause of the failure; if null, the connector will not 
be failed
+ */
+public void fail(Throwable cause) {
+synchronized (this) {
+if (this.externalFailure != null)
+return;
+this.externalFailure = cause;
+notify();
+}
+}
+
 void doRun() {
 initialize();
 while (!stopping) {
+Throwable failure = externalFailure;
+if (failure != null)

Review Comment:
   nit: curly braces



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -391,7 +391,15 @@ public List> 
connectorTaskConfigs(String connName, Connector
 Connector connector = workerConnector.connector();
 try (LoaderSwap loaderSwap = 
plugins.withClassLoader(workerConnector.loader())) {
 String taskClassName = connector.taskClass().getName();
-for (Map taskProps : 
connector.taskConfigs(maxTasks)) {
+List> taskConfigs = 
connector.taskConfigs(maxTasks);
+try {
+checkTasksMax(connName, taskConfigs.size(), maxTasks, 
connConfig.enforceTasksMax());
+} catch (TooManyTasksException e) {
+// TODO: This control flow is awkward. Push task config 
generation into WorkerConnector class?

Review Comment:
   This makes sense to me; checkTasksMax could be public static, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449513344


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##
@@ -168,11 +102,13 @@ public String toString(boolean includeMessage) {
 builder.append("' with class '");
 builder.append(executingClass() == null ? "null" : 
executingClass().getName());
 builder.append('\'');
-if (includeMessage && sourceRecord() != null) {
+T original = original();
+if (includeMessage && original instanceof SourceRecord) {

Review Comment:
   Ill expand on this, since it also provides context for the strange casting 
in DeadLetterQueueReporter.
   
   I think the strange "if source do [x], if sink do [y]" in 
`ProcessingContext#toString(boolean)` and `DeadLetterQueueReporter#report()` 
are both symptoms of the inadequate ErrorReporter signature. Since there's only 
one ErrorReporter signature used by both sources and sinks, there isn't 
anything in the type definitions to force the DeadLetterQueueReporter to only 
work with sinks, so it has to include a runtime check in the implementation.
   
   Similarly, there are two different "kinds" of LogReporter, the one for 
sources that hits the first branch in the toString, and one that hits the 
second. Each LogReporter instance only ever takes one of the branches, but that 
isn't clear from the implementation. If we make ErrorReporter generic, we can 
have the implementations give functionality for the specific record types when 
compiling or instantiating, instead of in the report() function.
   
   But pulling on this thread unravelled a bit more: To make ErrorReporter 
generic, we have to make RetryWithToleranceOperator generic, make LogReporter 
generic, change some RetryWithToleranceOperator instance reuse, and move the 
RetryWithToleranceOperator out of WorkerTask. All of which are just lateral 
refactors, and I think would be easier to address in a follow-up.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449513344


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##
@@ -168,11 +102,13 @@ public String toString(boolean includeMessage) {
 builder.append("' with class '");
 builder.append(executingClass() == null ? "null" : 
executingClass().getName());
 builder.append('\'');
-if (includeMessage && sourceRecord() != null) {
+T original = original();
+if (includeMessage && original instanceof SourceRecord) {

Review Comment:
   Ill expand on this, since it also provides context for the strange casting 
in DeadLetterQueueReporter.
   
   I think the strange "if source do , if sink do " in 
`ProcessingContext#toString(boolean)` and `DeadLetterQueueReporter#report()` 
are both symptoms of the inadequate ErrorReporter signature. Since there's only 
one ErrorReporter signature used by both sources and sinks, there isn't 
anything in the type definitions to force the DeadLetterQueueReporter to only 
work with sinks, so it has to include a runtime check in the implementation.
   
   Similarly, there are two different "kinds" of LogReporter, the one for 
sources that hits the first branch in the toString, and one that hits the 
second. Each LogReporter instance only ever takes one of the branches, but that 
isn't clear from the implementation. If we make ErrorReporter generic, we can 
have the implementations give functionality for the specific record types when 
compiling or instantiating, instead of in the report() function.
   
   But pulling on this thread unravelled a bit more: To make ErrorReporter 
generic, we have to make RetryWithToleranceOperator generic, make LogReporter 
generic, change some RetryWithToleranceOperator instance reuse, and move the 
RetryWithToleranceOperator out of WorkerTask. All of which are just lateral 
refactors, and I think would be easier to address in a follow-up.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16116:
--
Fix Version/s: 3.8.0

> AsyncKafkaConsumer: Add missing rebalance metrics
> -
>
> Key: KAFKA-16116
> URL: https://issues.apache.org/jira/browse/KAFKA-16116
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The following metrics are missing:
> |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]|
> |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]|
> |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]|
> |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]|
> |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]|
> |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]|
> |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16115:
--
Component/s: clients

> AsyncKafkaConsumer: Add missing heartbeat metrics
> -
>
> Key: KAFKA-16115
> URL: https://issues.apache.org/jira/browse/KAFKA-16115
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The following metrics are missing:
> |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]|
> |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]|
> |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]|
> |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]|
> |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16116:
--
Component/s: clients

> AsyncKafkaConsumer: Add missing rebalance metrics
> -
>
> Key: KAFKA-16116
> URL: https://issues.apache.org/jira/browse/KAFKA-16116
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The following metrics are missing:
> |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]|
> |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]|
> |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]|
> |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]|
> |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]|
> |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]|
> |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16115:
--
Labels: consumer-threading-refactor  (was: )

> AsyncKafkaConsumer: Add missing heartbeat metrics
> -
>
> Key: KAFKA-16115
> URL: https://issues.apache.org/jira/browse/KAFKA-16115
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The following metrics are missing:
> |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]|
> |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]|
> |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]|
> |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]|
> |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16095) Update list group state type filter to include the states for the new consumer group type

2024-01-11 Thread Ritika Reddy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805828#comment-17805828
 ] 

Ritika Reddy commented on KAFKA-16095:
--

[~isding_l] When would you be taking up this task? We do need it in production 
fairly soon

> Update list group state type filter to include the states for the new 
> consumer group type
> -
>
> Key: KAFKA-16095
> URL: https://issues.apache.org/jira/browse/KAFKA-16095
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: Lan Ding
>Priority: Minor
>
> # While using *—list —state* the current accepted values correspond to the 
> classic group type states. We need to include support for the new group type 
> states.
>  ## Consumer Group: Should list the state of the group. Accepted Values: 
>  ### _UNKNOWN(“unknown”)_
>  ### {_}EMPTY{_}("empty"),
>  ### *{_}ASSIGNING{_}("assigning"),*
>  ### *{_}RECONCILING{_}("reconciling"),*
>  ### {_}STABLE{_}("stable"),
>  ### {_}DEAD{_}("dead");
>  # 
>  ## Classic Group : Should list the state of the group. Accepted Values: 
>  ### {_}UNKNOWN{_}("Unknown"),
>  ### {_}EMPTY{_}("Empty");
>  ### *{_}PREPARING_REBALANCE{_}("PreparingRebalance"),*
>  ### *{_}COMPLETING_REBALANCE{_}("CompletingRebalance"),*
>  ### {_}STABLE{_}("Stable"),
>  ### {_}DEAD{_}("Dead")



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16115:
--
Fix Version/s: 3.8.0

> AsyncKafkaConsumer: Add missing heartbeat metrics
> -
>
> Key: KAFKA-16115
> URL: https://issues.apache.org/jira/browse/KAFKA-16115
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.8.0
>
>
> The following metrics are missing:
> |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]|
> |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]|
> |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]|
> |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]|
> |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16117) Add Integration test for checking if the correct assignor is chosen

2024-01-11 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-16117:


 Summary: Add Integration test for checking if the correct assignor 
is chosen
 Key: KAFKA-16117
 URL: https://issues.apache.org/jira/browse/KAFKA-16117
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy


h4.  We are trying to test this section of the KIP-848
h4. Assignor Selection

The group coordinator has to determine which assignment strategy must be used 
for the group. The group's members may not have exactly the same assignors at 
any given point in time - e.g. they may migrate from an assignor to another one 
for instance. The group coordinator will chose the assignor as follow:
 * A client side assignor is used if possible. This means that a client side 
assignor must be supported by all the members. If multiple are, it will respect 
the precedence defined by the members when they advertise their supported 
client side assignors.
 * A server side assignor is used otherwise. If multiple server side assignors 
are specified in the group, the group coordinator uses the most common one. If 
a member does not provide an assignor, the group coordinator will default to 
the first one in {{{}group.consumer.assignors{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16022) AsyncKafkaConsumer sometimes complains “No current assignment for partition {}”

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16022:
--
Summary: AsyncKafkaConsumer sometimes complains “No current assignment for 
partition {}”  (was: AsyncKafkaConsumer sometimes complains "No current 
assignment for partition {}")

> AsyncKafkaConsumer sometimes complains “No current assignment for partition 
> {}”
> ---
>
> Key: KAFKA-16022
> URL: https://issues.apache.org/jira/browse/KAFKA-16022
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> This seems to be a timing issue that before the member receives any 
> assignment from the coordinator, the fetcher will try to find the current 
> position causing "No current assignment for partition {}".  This creates a 
> small amount of noise to the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-11 Thread via GitHub


apoorvmittal10 commented on PR #15148:
URL: https://github.com/apache/kafka/pull/15148#issuecomment-1888052029

   @mjsax Can you please take a re-look, I have addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics

2024-01-11 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16116:
---
Labels: consumer-threading-refactor  (was: )

> AsyncKafkaConsumer: Add missing rebalance metrics
> -
>
> Key: KAFKA-16116
> URL: https://issues.apache.org/jira/browse/KAFKA-16116
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The following metrics are missing:
> |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]|
> |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]|
> |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]|
> |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]|
> |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]|
> |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]|
> |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics

2024-01-11 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16116:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Improvement)

> AsyncKafkaConsumer: Add missing rebalance metrics
> -
>
> Key: KAFKA-16116
> URL: https://issues.apache.org/jira/browse/KAFKA-16116
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> The following metrics are missing:
> |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]|
> |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]|
> |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]|
> |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]|
> |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]|
> |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]|
> |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics

2024-01-11 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16116:
---
Component/s: consumer
 metrics

> AsyncKafkaConsumer: Add missing rebalance metrics
> -
>
> Key: KAFKA-16116
> URL: https://issues.apache.org/jira/browse/KAFKA-16116
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> The following metrics are missing:
> |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]|
> |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]|
> |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]|
> |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]|
> |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]|
> |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]|
> |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics

2024-01-11 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16116:
--

 Summary: AsyncKafkaConsumer: Add missing rebalance metrics
 Key: KAFKA-16116
 URL: https://issues.apache.org/jira/browse/KAFKA-16116
 Project: Kafka
  Issue Type: Improvement
Reporter: Philip Nee
Assignee: Philip Nee


The following metrics are missing:
|[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]|
|[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]|
|[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]|
|[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]|
|[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]|
|[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]|
|[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics

2024-01-11 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16115:
--

 Summary: AsyncKafkaConsumer: Add missing heartbeat metrics
 Key: KAFKA-16115
 URL: https://issues.apache.org/jira/browse/KAFKA-16115
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, metrics
Reporter: Philip Nee
Assignee: Philip Nee


The following metrics are missing:
|[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]|
|[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]|
|[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]|
|[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]|
|[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics

2024-01-11 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16115:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Improvement)

> AsyncKafkaConsumer: Add missing heartbeat metrics
> -
>
> Key: KAFKA-16115
> URL: https://issues.apache.org/jira/browse/KAFKA-16115
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> The following metrics are missing:
> |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]|
> |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]|
> |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]|
> |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]|
> |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-01-11 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805806#comment-17805806
 ] 

Phuc Hong Tran commented on KAFKA-15561:


[~lianetm] are the classes that implements ApiMessage have some special 
settings to them so that peolple can't make change to them? Every times I make 
a change and them compile the code those changes are auto removed.

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449403859


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -303,48 +303,14 @@ public String toString() {
  * @param reporters the error reporters (should not be null).
  */
 public synchronized void reporters(List reporters) {
-this.context.reporters(reporters);
-}
-
-/**
- * Set the source record being processed in the connect pipeline.
- *
- * @param preTransformRecord the source record
- */
-public synchronized void sourceRecord(SourceRecord preTransformRecord) {
-this.context.sourceRecord(preTransformRecord);
-}
-
-/**
- * Set the record consumed from Kafka in a sink connector.
- *
- * @param consumedMessage the record
- */
-public synchronized void consumerRecord(ConsumerRecord 
consumedMessage) {
-this.context.consumerRecord(consumedMessage);
-}
-
-/**
- * @return true, if the last operation encountered an error; false 
otherwise
- */
-public synchronized boolean failed() {
-return this.context.failed();
-}
-
-/**
- * Returns the error encountered when processing the current stage.
- *
- * @return the error encountered when processing the current stage
- */
-public synchronized Throwable error() {
-return this.context.error();
+this.reporters = Objects.requireNonNull(reporters, "reporters");

Review Comment:
   While I like the CachedSupplier(Supplier supplier) signature, I think it 
is probably not a good idea to use it with AutoCloseables, as the ownership of 
the object is ambiguous from the signature.
   
   Currently I know that Supplier> puts the responsibility 
on the caller of get() to close the returned error reporters, but if 
CachedSupplier exists, some call-sites of get() will close the error reporters, 
and some won't.
   
   I guess the SharedTopicAdmin is the most similar thing I can think of. When 
it's a SharedTopicAdmin object, it needs to be closed. When it is cast to a 
Supplier, it doesn't need to be closed. Perhaps we could do 
something similar here.
   
   Perhaps the following (a little lighter weight than SharedTopicAdmin)?
   ```
   public class AutoClosableSupplier implements 
AutoCloseable, Supplier {
   public AutoCloseableSupplier(Supplier s, String closeMessage) {
   ...
   }
   }
   ```
   
   edit: List is itself not closeable, maybe stuff would need to 
change to be List> or similar...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449403859


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -303,48 +303,14 @@ public String toString() {
  * @param reporters the error reporters (should not be null).
  */
 public synchronized void reporters(List reporters) {
-this.context.reporters(reporters);
-}
-
-/**
- * Set the source record being processed in the connect pipeline.
- *
- * @param preTransformRecord the source record
- */
-public synchronized void sourceRecord(SourceRecord preTransformRecord) {
-this.context.sourceRecord(preTransformRecord);
-}
-
-/**
- * Set the record consumed from Kafka in a sink connector.
- *
- * @param consumedMessage the record
- */
-public synchronized void consumerRecord(ConsumerRecord 
consumedMessage) {
-this.context.consumerRecord(consumedMessage);
-}
-
-/**
- * @return true, if the last operation encountered an error; false 
otherwise
- */
-public synchronized boolean failed() {
-return this.context.failed();
-}
-
-/**
- * Returns the error encountered when processing the current stage.
- *
- * @return the error encountered when processing the current stage
- */
-public synchronized Throwable error() {
-return this.context.error();
+this.reporters = Objects.requireNonNull(reporters, "reporters");

Review Comment:
   While I like the CachedSupplier(Supplier supplier) signature, I think it 
is probably not a good idea to use it with AutoCloseables, as the ownership of 
the object is ambiguous from the signature.
   
   Currently I know that Supplier> puts the responsibility 
on the caller of get() to close the returned error reporters, but if 
CachedSupplier exists, some call-sites of get() will close the error reporters, 
and some won't.
   
   I guess the SharedTopicAdmin is the most similar thing I can think of. When 
it's a SharedTopicAdmin object, it needs to be closed. When it is cast to a 
Supplier, it doesn't need to be closed. Perhaps we could do 
something similar here.
   
   Perhaps the following (a little lighter weight than SharedTopicAdmin)?
   ```
   public class AutoClosableSupplier implements 
AutoCloseable, Supplier {
   public AutoCloseableSupplier(Supplier s, String closeMessage) {
   ...
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16089) Kafka Streams still leaking memory

2024-01-11 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16089:
---
Fix Version/s: 3.8.0

> Kafka Streams still leaking memory
> --
>
> Key: KAFKA-16089
> URL: https://issues.apache.org/jira/browse/KAFKA-16089
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Critical
> Fix For: 3.8.0
>
> Attachments: fix.png, graphviz (1).svg, unfix.png
>
>
> In 
> [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2]
>  a leak was fixed in the release candidate for 3.7.
>  
> However, Kafka Streams still seems to be leaking memory (just slower) after 
> the fix.
>  
> Attached is the `jeprof` output right before a crash after ~11 hours.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16089) Kafka Streams still leaking memory

2024-01-11 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805801#comment-17805801
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16089:


Yeah, nice investigation! That was a tricky one

> Kafka Streams still leaking memory
> --
>
> Key: KAFKA-16089
> URL: https://issues.apache.org/jira/browse/KAFKA-16089
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Critical
> Attachments: fix.png, graphviz (1).svg, unfix.png
>
>
> In 
> [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2]
>  a leak was fixed in the release candidate for 3.7.
>  
> However, Kafka Streams still seems to be leaking memory (just slower) after 
> the fix.
>  
> Attached is the `jeprof` output right before a crash after ~11 hours.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-11 Thread via GitHub


philipnee commented on PR #15000:
URL: https://github.com/apache/kafka/pull/15000#issuecomment-1887951845

   hey @lucasbru - i assume invoker queue here you meant by 
`OffsetCommitCallbackInvoker`.  It is not shared with the background thread.  
There are two invokers used by the async consumer, one is the offset commit 
callback invoker, the other one is the rebalance callback.   The rebalance 
callback is shared by the two threads but the commit callback invoker is not.
   
   my original thought was to use the background event to pass the interceptor 
onCommit event.
   
   i think we are all on the same page about using the invoker to invoke the 
events.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449351768


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java:
##
@@ -121,20 +121,22 @@ public static DeadLetterQueueReporter 
createAndSetup(Map adminPr
 /**
  * Write the raw records into a Kafka topic and return the producer future.
  *
- * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
+ * @param context processing context containing the raw record at {@link 
ProcessingContext#original()}.
  * @return the future associated with the writing of this record; never 
null
  */
-public Future report(ProcessingContext context) {
+@SuppressWarnings("unchecked")
+public Future report(ProcessingContext context) {
 if (dlqTopicName.isEmpty()) {
 return CompletableFuture.completedFuture(null);
 }
 errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
-ConsumerRecord originalMessage = 
context.consumerRecord();
-if (originalMessage == null) {
+if (!(context.original() instanceof ConsumerRecord)) {
 errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
 return CompletableFuture.completedFuture(null);
 }
+ProcessingContext> sinkContext = 
(ProcessingContext>) context;

Review Comment:
   Yes, but ConsumerRecord has no subclasses, and we never create a 
ProcessingContext for anything other than SourceRecord and 
ConsumerRecord.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449349792


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java:
##
@@ -121,20 +121,22 @@ public static DeadLetterQueueReporter 
createAndSetup(Map adminPr
 /**
  * Write the raw records into a Kafka topic and return the producer future.
  *
- * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
+ * @param context processing context containing the raw record at {@link 
ProcessingContext#original()}.
  * @return the future associated with the writing of this record; never 
null
  */
-public Future report(ProcessingContext context) {
+@SuppressWarnings("unchecked")
+public Future report(ProcessingContext context) {
 if (dlqTopicName.isEmpty()) {
 return CompletableFuture.completedFuture(null);
 }
 errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
-ConsumerRecord originalMessage = 
context.consumerRecord();
-if (originalMessage == null) {
+if (!(context.original() instanceof ConsumerRecord)) {

Review Comment:
   This is another generic-oddity that I fixed with a follow-up refactor that 
makes the DeadLetterQueueReporter#report accept a 
ProcessingContext> and eliminates all of these 
runtime checks which don't ever actually fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449347346


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -236,7 +237,7 @@ protected  V execAndHandleError(Operation operation, 
Class

Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449326220


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements 
AutoCloseable {
 private final ErrorHandlingMetrics errorHandlingMetrics;
 private final CountDownLatch stopRequestedLatch;
 private volatile boolean stopping;   // indicates whether the operator has 
been asked to stop retrying
-
-protected final ProcessingContext context;
+private List reporters;
 
 public RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
   ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics) {
-this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
+this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new CountDownLatch(1));
 }
 
 RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics,
-   ProcessingContext context, CountDownLatch 
stopRequestedLatch) {
+   CountDownLatch stopRequestedLatch) {
 this.errorRetryTimeout = errorRetryTimeout;
 this.errorMaxDelayInMillis = errorMaxDelayInMillis;
 this.errorToleranceType = toleranceType;
 this.time = time;
 this.errorHandlingMetrics = errorHandlingMetrics;
-this.context = context;
 this.stopRequestedLatch = stopRequestedLatch;
 this.stopping = false;
+this.reporters = Collections.emptyList();
 }
 
-public synchronized Future executeFailed(Stage stage, Class 
executingClass,
-  ConsumerRecord 
consumerRecord,
-  Throwable error) {
-
+public Future executeFailed(ProcessingContext context, Stage 
stage, Class executingClass, Throwable error) {
 markAsFailed();
-context.consumerRecord(consumerRecord);
 context.currentContext(stage, executingClass);
 context.error(error);
 errorHandlingMetrics.recordFailure();
-Future errantRecordFuture = context.report();
+Future errantRecordFuture = report(context);
 if (!withinToleranceLimits()) {
 errorHandlingMetrics.recordError();
 throw new ConnectException("Tolerance exceeded in error handler", 
error);
 }
 return errantRecordFuture;
 }
 
-public synchronized Future executeFailed(Stage stage, Class 
executingClass,
-   SourceRecord sourceRecord,
-   Throwable error) {

Review Comment:
   Yep, there were two nearly-identical implementations that differed only by 
the type of record they accepted.
   
   They differed in the ConnectException message, and when merging them I just 
kept the more generic message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449313479


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##
@@ -17,82 +17,36 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.errors.ConnectException;
-import 
org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture;
 import org.apache.kafka.connect.source.SourceRecord;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
 /**
- * Contains all the metadata related to the currently evaluating operation. 
Only one instance of this class is meant
- * to exist per task in a JVM.
+ * Contains all the metadata related to the currently evaluating operation, 
and associated with a particular
+ * sink or source record from the consumer or task, respectively. This class 
is not thread safe, and so once an
+ * instance is passed to a new thread, it should no longer be accessed by the 
previous thread.

Review Comment:
   1. I believe so, as long as the "passing between the threads" itself is 
thread safe. So for example, if one thread writes to context, writes the 
context to a volatile field, then a second thread reads from that volatile 
field and then reads from the context, the memory model should ensure that the 
read sees the writes.
   
   Producer::send appears to have the synchronization when allocating the write 
to a batch. And the InternalSinkRecord#context field is marked final, which 
according to [this 
guidance](https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#finalRight)
 regarding final fields (emphasis is mine) should be sufficient:
   
   > The values for an object's final fields are set in its constructor. 
Assuming the object is constructed "correctly", once an object is constructed, 
the values assigned to the final fields in the constructor will be visible to 
all other threads without synchronization. In addition, the visible values for 
any other object or array referenced by those final fields **will be at least 
as up-to-date as the final fields.**
   
   2. The fields themselves are not synchronized, as the object is not 
thread-safe. I don't think volatile buys any thread safety in this situation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]

2024-01-11 Thread via GitHub


C0urante commented on PR #15180:
URL: https://github.com/apache/kafka/pull/15180#issuecomment-1887808692

   @gharris1727 @yashmayya @mimaison would any of you mind taking a look when 
you have a moment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-01-11 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805779#comment-17805779
 ] 

Lianet Magrans commented on KAFKA-15561:


Hey [~phuctran], this ticket is only for the client support of the regex, so 
only changes in the consumer side to make sure that we pass on the regex in the 
request after calls to #subscribe(SubscriptionPattern...). But regex are not 
supported by the server yet (that's why it's not included in the 
ConsumerGroupHeartbeatRequestData). Once the server supports it, we can take on 
this one to have the client use it. Thanks for your interest! Happy to answer 
any other question.

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15475:
--
Priority: Critical  (was: Minor)

> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449271258


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -143,7 +144,7 @@ public synchronized Future executeFailed(Stage stage, 
Class executingCl
  * @param  return type of the result of the operation.
  * @return result of the operation
  */
-public synchronized  V execute(Operation operation, Stage stage, 
Class executingClass) {
+public  V execute(ProcessingContext context, Operation operation, 
Stage stage, Class executingClass) {

Review Comment:
   I know this isn't your fault but if you have time, could we add a `throws` 
clause to the Javadocs stating that an exception will be thrown if a 
non-tolerable error is encountered? I always get tripped up reading 
interactions with this class and a big part of it is trying to understand the 
conditions where exceptions are thrown, the context is marked as failed, or 
`null` is returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15320) Document event queueing patterns

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15320:
--
Priority: Minor  (was: Major)

> Document event queueing patterns
> 
>
> Key: KAFKA-15320
> URL: https://issues.apache.org/jira/browse/KAFKA-15320
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, documentation
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> We need to first document the event enqueuing patterns in the 
> PrototypeAsyncConsumer. As part of this task, determine if it’s 
> necessary/beneficial to _conditionally_ add events and/or coalesce any 
> duplicate events in the queue.
> _Don’t forget to include diagrams for clarity!_
> This should be documented on the AK wiki.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15283:
--
Priority: Minor  (was: Major)

> Client support for OffsetFetch and OffsetCommit with topic ID
> -
>
> Key: KAFKA-15283
> URL: https://issues.apache.org/jira/browse/KAFKA-15283
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory 
> {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and 
> {{METADATA}} RPC calls.
> With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in 
> the same way, so the new client implementation will provide it when issuing 
> those requests. Topic names should continue to be supported as needed by the 
> {{{}AdminClient{}}}.
> We should also review/clean-up the support for topic names in requests such 
> as the {{METADATA}} request (currently supporting topic names as well as 
> topic IDs on the client side).
> Tasks include:
>  * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will 
> be upgraded on the server to support topic ID
>  * Check topic ID propagation internally in the client based on RPCs 
> including it.
>  * Review existing support for topic name for potential clean if not needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15173) Consumer event queues should be bounded

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15173:
--
Priority: Minor  (was: Major)

> Consumer event queues should be bounded
> ---
>
> Key: KAFKA-15173
> URL: https://issues.apache.org/jira/browse/KAFKA-15173
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The async consumer uses ApplicationEventQueue and BackgroundEventQueue to 
> facilitate message passing between the application thread and the background 
> thread.  The current implementation is boundless, which can potentially cause 
> OOM and other performance-related issues.
> I think the queues need a finite bound, and we need to decide how to handle 
> the situation when the bound is reached.  In particular, I would like to 
> answer these questions:
>  
>  # What should the upper limit be for both queues: Can this be a 
> configurable, memory-based bound? Or just an arbitrary number of events as 
> the bound.
>  # What should happen when the application event queue is filled up?  It 
> seems like we should introduce a new exception type and notify the user that 
> the consumer is full.
>  # What should happen when the background event queue is filled up? This 
> seems less likely to happen, but I imagine it could happen when the user 
> stops polling the consumer, causing the queue to be filled.
>  # Is it necessary to introduce a public configuration for the queue? I think 
> initially we would select an arbitrary constant number and see the community 
> feedback to make a forward plan accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16104:
-

Assignee: Kirk True

> Enable additional PlaintextConsumerTest tests for new consumer
> --
>
> Key: KAFKA-16104
> URL: https://issues.apache.org/jira/browse/KAFKA-16104
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> It should be possible to enable:
> * testAutoCommitOnClose
> * testAutoCommitOnCloseAfterWakeup
> * testExpandingTopicSubscriptions
> * testShrinkingTopicSubscriptions
> * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed)
> * testAutoCommitOnRebalance
> * testPerPartitionLeadMetricsCleanUpWithSubscribe
> * testPerPartitionLagMetricsCleanUpWithSubscribe
> * testStaticConsumerDetectsNewPartitionCreatedAfterRestart



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssigned completes

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16107:
-

Assignee: Lianet Magrans

> Ensure consumer does not start fetching from added partitions until 
> onPartitionsAssigned completes
> --
>
> Key: KAFKA-16107
> URL: https://issues.apache.org/jira/browse/KAFKA-16107
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> In the new consumer implementation, when new partitions are assigned, the 
> subscription state is updated and then the #onPartitionsAssigned triggered. 
> This sequence seems sensible but we need to ensure that no data is fetched 
> until the onPartitionsAssigned completes (where the user could be setting the 
> committed offsets it want to start fetching from).
> We should pause the partitions newly added partitions until 
> onPartitionsAssigned completes, similar to how it's done on revocation to 
> avoid positions getting ahead of the committed offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16010:
-

Assignee: Kirk True

> Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
> --
>
> Key: KAFKA-16010
> URL: https://issues.apache.org/jira/browse/KAFKA-16010
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Did not get valid assignment for 
> partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, 
> topic1-0, topic1-3] after one consumer left
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286)
>   at 
> kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883)
>   at 
> kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281)
> {code}
> The logs include these lines:
>  
> {code}
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16009:
-

Assignee: Kirk True

> Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
> 
>
> Key: KAFKA-16009
> URL: https://issues.apache.org/jira/browse/KAFKA-16009
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
>   at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16008:
-

Assignee: Kirk True

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16023:
-

Assignee: Kirk True

> PlaintextConsumerTest needs to wait for reconciliation to complete before 
> proceeding
> 
>
> Key: KAFKA-16023
> URL: https://issues.apache.org/jira/browse/KAFKA-16023
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Several tests in PlaintextConsumerTest.scala (such as 
> testPerPartitionLagMetricsCleanUpWithSubscribe) uses:
> assertEquals(1, listener.callsToAssigned, "should be assigned once")
> However, as the timing for reconciliation completion is not deterministic due 
> to asynchronous processing. We actually need to wait until the condition to 
> happen.
> However, another issue is the timeout - some of these tasks might not 
> complete within the 600ms timeout, so the tests are deemed to be flaky.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16100:
-

Assignee: Kirk True

> Consistent handling of timeouts and responses for new consumer 
> ApplicationEvents
> 
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invoke and verify the consumer callback

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16019:
-

Assignee: Kirk True

> Some of the tests in PlaintextConsumer can't seem to deterministically invoke 
> and verify the consumer callback
> --
>
> Key: KAFKA-16019
> URL: https://issues.apache.org/jira/browse/KAFKA-16019
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> I was running the PlaintextConsumer to test the async consumer; however, a 
> few tests were failing with not being able to verify the listener is invoked 
> correctly
> For example `testPerPartitionLeadMetricsCleanUpWithSubscribe`
> Around 50% of the time, the listener's callsToAssigned was never incremented 
> correctly.  Event changing it to awaitUntilTrue it was still the same case
> {code:java}
> consumer.subscribe(List(topic, topic2).asJava, listener)
> val records = awaitNonEmptyRecords(consumer, tp)
> assertEquals(1, listener.callsToAssigned, "should be assigned once") {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16004) Review new consumer inflight offset commit logic

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16004:
-

Assignee: Lianet Magrans

> Review new consumer inflight offset commit logic
> 
>
> Key: KAFKA-16004
> URL: https://issues.apache.org/jira/browse/KAFKA-16004
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer logic for committing offsets handles inflight requests, to 
> validate that no commit requests are sent if a previous one hasn't received a 
> response. Review how that logic is currently applied to both, sync and async 
> commits and validate against the legacy coordinator, who seems to apply it 
> only for async commits. Review considering behaviour for auto-commits too. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16001:
-

Assignee: Kirk True

> Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
> ---
>
> Key: KAFKA-16001
> URL: https://issues.apache.org/jira/browse/KAFKA-16001
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15993) Enable max poll integration tests that depend on callback invocation

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15993:
-

Assignee: Kirk True

> Enable max poll integration tests that depend on callback invocation
> 
>
> Key: KAFKA-15993
> URL: https://issues.apache.org/jira/browse/KAFKA-15993
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> We will enable integration tests using the async consumer in KAFKA-15971.  
> However, we should also enable tests that rely on rebalance listeners after 
> KAFKA-15628 is closed.  One example would be testMaxPollIntervalMs, that I 
> relies on the listener to verify the correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15999) Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15999:
-

Assignee: Kirk True

> Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder
> -
>
> Key: KAFKA-15999
> URL: https://issues.apache.org/jira/browse/KAFKA-15999
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lucas Brutschy
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-11 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449180005


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements 
AutoCloseable {
 private final ErrorHandlingMetrics errorHandlingMetrics;
 private final CountDownLatch stopRequestedLatch;
 private volatile boolean stopping;   // indicates whether the operator has 
been asked to stop retrying
-
-protected final ProcessingContext context;
+private List reporters;
 
 public RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
   ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics) {
-this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
+this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new CountDownLatch(1));
 }
 
 RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics,
-   ProcessingContext context, CountDownLatch 
stopRequestedLatch) {
+   CountDownLatch stopRequestedLatch) {
 this.errorRetryTimeout = errorRetryTimeout;
 this.errorMaxDelayInMillis = errorMaxDelayInMillis;
 this.errorToleranceType = toleranceType;
 this.time = time;
 this.errorHandlingMetrics = errorHandlingMetrics;
-this.context = context;
 this.stopRequestedLatch = stopRequestedLatch;
 this.stopping = false;
+this.reporters = Collections.emptyList();
 }
 
-public synchronized Future executeFailed(Stage stage, Class 
executingClass,
-  ConsumerRecord 
consumerRecord,
-  Throwable error) {
-
+public Future executeFailed(ProcessingContext context, Stage 
stage, Class executingClass, Throwable error) {
 markAsFailed();
-context.consumerRecord(consumerRecord);
 context.currentContext(stage, executingClass);
 context.error(error);
 errorHandlingMetrics.recordFailure();
-Future errantRecordFuture = context.report();
+Future errantRecordFuture = report(context);
 if (!withinToleranceLimits()) {
 errorHandlingMetrics.recordError();
 throw new ConnectException("Tolerance exceeded in error handler", 
error);
 }
 return errantRecordFuture;
 }
 
-public synchronized Future executeFailed(Stage stage, Class 
executingClass,
-   SourceRecord sourceRecord,
-   Throwable error) {
-
-markAsFailed();
-context.sourceRecord(sourceRecord);
-context.currentContext(stage, executingClass);
-context.error(error);
-errorHandlingMetrics.recordFailure();
-Future errantRecordFuture = context.report();
-if (!withinToleranceLimits()) {
-errorHandlingMetrics.recordError();
-throw new ConnectException("Tolerance exceeded in Source Worker 
error handler", error);
+/**
+ * Report errors. Should be called only if an error was encountered while 
executing the operation.
+ *
+ * @return a errant record future that potentially aggregates the producer 
futures

Review Comment:
   Nit (I know this is just moved as-is from the `ProcessingContext` class but 
we might as well fix it up while we're in the neighborhood):
   ```suggestion
* @return an errant record future that potentially aggregates the 
producer futures
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##
@@ -17,82 +17,36 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.errors.ConnectException;
-import 
org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture;
 import org.apache.kafka.connect.source.SourceRecord;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
 /**
- * Contains all the metadata related to the currently evaluating operation. 
Only one instance of this class is meant
- * to exist per task in a JVM.
+ * Contains all the metadata related to the currently e

[jira] [Assigned] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15843:
-

Assignee: Lianet Magrans

> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call onPartitionsAssigned with empty assignment (or if it 
> should behave consistently with the onRevoke/Lost)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15847) Consider partial metadata requests for client reconciliation

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15847:
-

Assignee: Lianet Magrans

> Consider partial metadata requests for client reconciliation
> 
>
> Key: KAFKA-15847
> URL: https://issues.apache.org/jira/browse/KAFKA-15847
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> New consumer implementing KIP-848 protocol needs to resolve metadata for the 
> topics received in the assignment. It does so by relying on the centralized 
> metadata object. Currently metadata updates requested through the metadata 
> object, request metadata for all topics. Consider allowing the partial 
> updates that are already expressed as an intention in the Metadata class but 
> not fully supported (investigate background in case there were some specifics 
> that led to this intention not being fully implemented) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15652) Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15652:
-

Assignee: Philip Nee

> Add tests to verify OffsetFetcherUtils.getOffsetResetTimestamp()
> 
>
> Key: KAFKA-15652
> URL: https://issues.apache.org/jira/browse/KAFKA-15652
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> In the {{updateFetchPositions()}} method implementation, both 
> {{KafkaConsumer}} and {{PrototypeAsyncConsumer}} reset positions 
> asynchronously. [~junrao] stated the following in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406#discussion_r1349173413]:
> {quote}There is a subtle difference between transitioning to reset from 
> initializing and transitioning to reset from {{OffsetOutOfRangeException}} 
> during fetch. In the latter, the application thread will call 
> {{{}FetchCollector.handleInitializeErrors(){}}}. If there is no default 
> offset reset policy, an {{OffsetOutOfRangeException}} will be thrown to the 
> application thread during {{{}poll{}}}, which is what we want.
> However, for the former, if there is no default offset reset policy, we 
> simply ignore that partition through 
> {{{}OffsetFetcherUtils.getOffsetResetTimestamp{}}}. It seems in that case, 
> the partition will be forever in the reset state and the application thread 
> won't get the {{{}OffsetOutOfRangeException{}}}.
> {quote}
> I intentionally changed the code so that no exceptions were thrown in 
> {{OffsetFetcherUtils.getOffsetResetTimestamp()}} and would simply return an 
> empty map. When I ran the unit tests and integration tests, there were no 
> failures, strongly suggesting that there is no coverage of this particular 
> edge case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15691) Add new system tests to use new consumer

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15691:
-

Assignee: Kirk True

> Add new system tests to use new consumer
> 
>
> Key: KAFKA-15691
> URL: https://issues.apache.org/jira/browse/KAFKA-15691
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15638:
-

Assignee: Kirk True  (was: Philip Nee)

> Investigate ConsumerNetworkThreadTest's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15551:
-

Assignee: Lianet Magrans  (was: Philip Nee)

> Evaluate conditions for short circuiting consumer API calls
> ---
>
> Key: KAFKA-15551
> URL: https://issues.apache.org/jira/browse/KAFKA-15551
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> For conditions like:
>  * Committing empty offset
>  * Fetching offsets for empty partitions
>  * Getting empty topic partition position
> Should be short circuit possibly at the API level.
> As a bonus, we should double-check whether the existing {{KafkaConsumer}} 
> implementation suffers from this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15639) Investigate ConsumerNetworkThreadTest's testResetPositionsProcessFailureIsIgnored

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15639:
-

Assignee: Kirk True  (was: Philip Nee)

> Investigate ConsumerNetworkThreadTest's 
> testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the {{doThrow}} line and it did not impact the test. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-11 Thread via GitHub


dajac commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1449270020


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   That’s fair. Let me add it tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-11 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1449268767


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   I would think it is just codifying the state of the protocol and will flag 
us if it changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15475:
-

Assignee: Kirk True  (was: Philip Nee)

> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-11 Thread via GitHub


dajac commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1449266480


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   I could add it but it does not bring much value in my opinion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16110) Implement consumer performance tests

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16110:
-

Assignee: Philip Nee  (was: Kirk True)

> Implement consumer performance tests
> 
>
> Key: KAFKA-16110
> URL: https://issues.apache.org/jira/browse/KAFKA-16110
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15283:
-

Assignee: Lianet Magrans

> Client support for OffsetFetch and OffsetCommit with topic ID
> -
>
> Key: KAFKA-15283
> URL: https://issues.apache.org/jira/browse/KAFKA-15283
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory 
> {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and 
> {{METADATA}} RPC calls.
> With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in 
> the same way, so the new client implementation will provide it when issuing 
> those requests. Topic names should continue to be supported as needed by the 
> {{{}AdminClient{}}}.
> We should also review/clean-up the support for topic names in requests such 
> as the {{METADATA}} request (currently supporting topic names as well as 
> topic IDs on the client side).
> Tasks include:
>  * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will 
> be upgraded on the server to support topic ID
>  * Check topic ID propagation internally in the client based on RPCs 
> including it.
>  * Review existing support for topic name for potential clean if not needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15305:
-

Assignee: Kirk True  (was: Philip Nee)

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-11 Thread via GitHub


AndrewJSchofield commented on PR #15000:
URL: https://github.com/apache/kafka/pull/15000#issuecomment-1887735105

   I agree with @lianetm that the second option seems best. The invoker 
mechanism already exists for `commitAsync()` and submitting to the invoker is 
already using a thread-safe object.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-11 Thread via GitHub


kamalcph commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1449241531


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -473,6 +498,11 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
 brokerTopicAggregatedMetric.setPartitionMetricValue(partition, segmentsLag)
   }
 
+  def recordRemoteDeleteLagBytes(topic: String, segmentsLag: Long): Unit = {

Review Comment:
   Can we rename `segmentsLag` to `bytesLag`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16113:
--
Fix Version/s: 3.8.0

> AsyncKafkaConsumer: Add missing offset commit metrics
> -
>
> Key: KAFKA-16113
> URL: https://issues.apache.org/jira/browse/KAFKA-16113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The following metrics are missing from the AsyncKafkaConsumer:
> commit-latency-avg
> commit-latency-max
> commit-rate
> commit-total
> committed-time-ns-total



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16113:
--
Labels: consumer-threading-refactor  (was: )

> AsyncKafkaConsumer: Add missing offset commit metrics
> -
>
> Key: KAFKA-16113
> URL: https://issues.apache.org/jira/browse/KAFKA-16113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The following metrics are missing from the AsyncKafkaConsumer:
> commit-latency-avg
> commit-latency-max
> commit-rate
> commit-total
> committed-time-ns-total



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16113:
--
Component/s: clients
 metrics

> AsyncKafkaConsumer: Add missing offset commit metrics
> -
>
> Key: KAFKA-16113
> URL: https://issues.apache.org/jira/browse/KAFKA-16113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The following metrics are missing from the AsyncKafkaConsumer:
> commit-latency-avg
> commit-latency-max
> commit-rate
> commit-total
> committed-time-ns-total



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-11 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1449238660


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   Oh interesting. Is it worth testing that we don't return it for now and 
update it if we plan to change the behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16111:
--
Summary: Implement tests for tricky rebalance callback scenarios  (was: 
Implement tests for tricky rebalance callbacks scenarios)

> Implement tests for tricky rebalance callback scenarios
> ---
>
> Key: KAFKA-16111
> URL: https://issues.apache.org/jira/browse/KAFKA-16111
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15876: Introduce RemoteStorageNotReadyException retryable error [kafka]

2024-01-11 Thread via GitHub


kamalcph commented on code in PR #14822:
URL: https://github.com/apache/kafka/pull/14822#discussion_r1449228856


##
clients/src/main/java/org/apache/kafka/common/errors/RemoteStorageNotReadyException.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * An exception that indicates remote storage is not ready to receive the 
requests yet.

Review Comment:
   If there are no alternatives, we may have to consider dropping the KIP or 
patch. This is something I should have brought up during the KIP discussion. I 
overlooked the implications on the client code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invoke and verify the consumer callback

2024-01-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16019:
--
Summary: Some of the tests in PlaintextConsumer can't seem to 
deterministically invoke and verify the consumer callback  (was: Some of the 
tests in PlaintextConsumer can't seem to deterministically invokes and verify 
the consumer callback)

> Some of the tests in PlaintextConsumer can't seem to deterministically invoke 
> and verify the consumer callback
> --
>
> Key: KAFKA-16019
> URL: https://issues.apache.org/jira/browse/KAFKA-16019
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> I was running the PlaintextConsumer to test the async consumer; however, a 
> few tests were failing with not being able to verify the listener is invoked 
> correctly
> For example `testPerPartitionLeadMetricsCleanUpWithSubscribe`
> Around 50% of the time, the listener's callsToAssigned was never incremented 
> correctly.  Event changing it to awaitUntilTrue it was still the same case
> {code:java}
> consumer.subscribe(List(topic, topic2).asJava, listener)
> val records = awaitNonEmptyRecords(consumer, tp)
> assertEquals(1, listener.callsToAssigned, "should be assigned once") {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15876: Introduce RemoteStorageNotReadyException retryable error [kafka]

2024-01-11 Thread via GitHub


kamalcph commented on code in PR #14822:
URL: https://github.com/apache/kafka/pull/14822#discussion_r1449218105


##
clients/src/main/java/org/apache/kafka/common/errors/RemoteStorageNotReadyException.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * An exception that indicates remote storage is not ready to receive the 
requests yet.

Review Comment:
   Updated the Javadoc. 
   
   
   We cannot introduce a new error-code as the clients will throw 
`IllegalStateException` back to the caller when it encounters any new 
error-code. See 
[FetchCollector#handleInitializeErrors](https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java?L309).
 What options do we have?
   
   With the existing `REPLICA_NOT_AVAILABLE` error, the client will request a 
metadata update which we want to avoid. It seems there is no other option left 
for us except `UNKNOWN_SERVER_ERROR` which is generic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-11 Thread via GitHub


lianetm commented on PR #15000:
URL: https://github.com/apache/kafka/pull/15000#issuecomment-1887680132

   My take would be option 2, using the invoker in the background thread and 
submitting a task for the interceptor. Seems like a clean way, re-using the 
mechanism of the invoker already in place, and without doing any major 
refactoring.
   
   I would definitely leave the auto-commit logic in the background thread 
where it is, as it's truly an internal operation/request we want to perform 
without any direct relation with the app layer, needed from multiple places in 
the background even: auto-commit on the interval, but also auto-commit as part 
of the reconciliation process.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate #testStartPaused to Mockito [kafka]

2024-01-11 Thread via GitHub


hgeraldino commented on PR #14663:
URL: https://github.com/apache/kafka/pull/14663#issuecomment-1887674876

   > Hi @hgeraldino Thanks for taking on the migration!
   > 
   > I understand the idea behind your refactor-then-deduplicate strategy, but 
I think the excessive duplication is making it difficult (at least for me) to 
review the change.
   > 
   > What do you think about starting a new test class, and moving the migrated 
tests into that new class? This would allow you to use synonymous variable 
names, annotation mocks, and method names? At the end we can delete the 
original class and move the new class back to the original class name.
   > 
   > This will separate the added and removed parts in the diff, where they are 
currently inline. But the mocking libraries are so substantially different that 
the inline parts of the diff are not very helpful anyway.
   
   Sure thing @gharris1727, if you think this will make the review process 
easier I'm happy to do that. I recon these reviews are a bit painful - 
especially for this test class that contains dozens of tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15147) Measure pending and outstanding Remote Segment operations

2024-01-11 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805734#comment-17805734
 ] 

Christo Lolov commented on KAFKA-15147:
---

Heya [~fvisconte], a couple of days ago [~showuon] also noticed discrepancies 
and hopefully they should be addressed as part of 
https://github.com/apache/kafka/pull/15133!

> Measure pending and outstanding Remote Segment operations
> -
>
> Key: KAFKA-15147
> URL: https://issues.apache.org/jira/browse/KAFKA-15147
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
>  
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Upload+and+delete+lag+metrics+in+Tiered+Storage
>  
> KAFKA-15833: RemoteCopyLagBytes 
> KAFKA-16002: RemoteCopyLagSegments, RemoteDeleteLagBytes, 
> RemoteDeleteLagSegments
> KAFKA-16013: ExpiresPerSec
> KAFKA-16014: RemoteLogSizeComputationTime, RemoteLogSizeBytes, 
> RemoteLogMetadataCount
> KAFKA-15158: RemoteDeleteRequestsPerSec, RemoteDeleteErrorsPerSec, 
> BuildRemoteLogAuxStateRequestsPerSec, BuildRemoteLogAuxStateErrorsPerSec
> 
> Remote Log Segment operations (copy/delete) are executed by the Remote 
> Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default 
> TopicBasedRLMM writes to the internal Kafka topic state changes on remote log 
> segments).
> As executions run, fail, and retry; it will be important to know how many 
> operations are pending and outstanding over time to alert operators.
> Pending operations are not enough to alert, as values can oscillate closer to 
> zero. An additional condition needs to apply (running time > threshold) to 
> consider an operation outstanding.
> Proposal:
> RemoteLogManager could be extended with 2 concurrent maps 
> (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure 
> segmentId time when operation started, and based on this expose 2 metrics per 
> operation:
>  * pendingSegmentCopies: gauge of pendingSegmentCopies map
>  * outstandingSegmentCopies: loop over pending ops, and if now - startedTime 
> > timeout, then outstanding++ (maybe on debug level?)
> Is this a valuable metric to add to Tiered Storage? or better to solve on a 
> custom RLMM implementation?
> Also, does it require a KIP?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-11 Thread via GitHub


dajac commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1449170060


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+List statesFilter,
+List typesFilter,
+long committedOffset
+) {
+Predicate combinedFilter = group -> {
+boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+boolean typeCheck = typesFilter.isEmpty() || 
typesFilter.contains(group.type().toString());

Review Comment:
   as discussed offline, we cannot rely on the client side to do the right 
thing. we should rather lower case them when on he server side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16089: Fix memory leak in RocksDBStore [kafka]

2024-01-11 Thread via GitHub


lucasbru merged PR #15174:
URL: https://github.com/apache/kafka/pull/15174


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15351: Ensure log-start-offset not updated to local-log-start-offset when remote storage enabled [kafka]

2024-01-11 Thread via GitHub


kamalcph commented on code in PR #14301:
URL: https://github.com/apache/kafka/pull/14301#discussion_r1449147426


##
core/src/main/scala/kafka/log/LogLoader.scala:
##
@@ -78,7 +78,8 @@ class LogLoader(
   recoveryPointCheckpoint: Long,
   leaderEpochCache: Option[LeaderEpochFileCache],
   producerStateManager: ProducerStateManager,
-  numRemainingSegments: ConcurrentMap[String, Int] = new 
ConcurrentHashMap[String, Int]
+  numRemainingSegments: ConcurrentMap[String, Int] = new 
ConcurrentHashMap[String, Int],
+  isRemoteLogEnabled: Boolean = false,

Review Comment:
   Opened a PR to address it: https://github.com/apache/kafka/pull/15179. PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add isRemoteLogEnabled parameter to the Log Loader Javadoc. [kafka]

2024-01-11 Thread via GitHub


kamalcph opened a new pull request, #15179:
URL: https://github.com/apache/kafka/pull/15179

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >