Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]

2024-01-09 Thread via GitHub


vamossagar12 commented on code in PR #15149:
URL: https://github.com/apache/kafka/pull/15149#discussion_r1446980325


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java:
##
@@ -41,18 +41,11 @@ public class HerderRequestHandler {
 
 private final RestClient restClient;
 
-private volatile long requestTimeoutMs;
+private final RestRequestTimeout requestTimeout;

Review Comment:
   The field `requestTimeoutMs` was made volatile as part of 
[here](https://github.com/apache/kafka/pull/14562/files#diff-f2311f0c356f882d7768b97cb5f0054dc7040d29d455eab74ab585725454488aR44).
 I don't think we need it now, but just wanted to understand why was it added 
over there (you can skip the explanation if it's too complex :) )



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java:
##
@@ -41,18 +41,11 @@ public class HerderRequestHandler {
 
 private final RestClient restClient;
 
-private volatile long requestTimeoutMs;
+private final RestRequestTimeout requestTimeout;
 
-public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) {
+public HerderRequestHandler(RestClient restClient, RestRequestTimeout 
requestTimeout) {
 this.restClient = restClient;
-this.requestTimeoutMs = requestTimeoutMs;
-}
-
-public void requestTimeoutMs(long requestTimeoutMs) {
-if (requestTimeoutMs < 1) {
-throw new IllegalArgumentException("REST request timeout must be 
positive");
-}

Review Comment:
   I am not sure if this validation was needed in the past as well, but now I 
don't see it being present. Do you think it's needed?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java:
##
@@ -45,25 +45,40 @@ public void initializeResources(Herder herder) {
 }
 
 @Override
-protected Collection regularResources() {
+protected Collection> regularResources() {
 return Arrays.asList(
-new RootResource(herder),
-new ConnectorsResource(herder, config, restClient),
-new InternalConnectResource(herder, restClient),
-new ConnectorPluginsResource(herder)
+RootResource.class,
+ConnectorsResource.class,
+InternalConnectResource.class,
+ConnectorPluginsResource.class
 );
 }
 
 @Override
-protected Collection adminResources() {
+protected Collection> adminResources() {
 return Arrays.asList(
-new LoggingResource(herder)
+LoggingResource.class
 );
 }
 
 @Override
 protected void configureRegularResources(ResourceConfig resourceConfig) {
 registerRestExtensions(herder, resourceConfig);
+resourceConfig.register(new Binder());
+}
+
+private class Binder extends AbstractBinder {
+@Override
+protected void configure() {
+bind(herder).to(Herder.class);
+bind(restClient).to(RestClient.class);
+bind(config).to(RestServerConfig.class);
+}
+}
+

Review Comment:
   nit: We could probably move the class definition to the bottom so that all 
`configureXXXResources` method are together



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest;
+
+public interface RestRequestTimeout {

Review Comment:
   nit: Mark this interface as `@FunctionalInterface`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16090) Refactor call to storage tool from kafka docker wrapper

2024-01-09 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805005#comment-17805005
 ] 

Mickael Maison commented on KAFKA-16090:


Context: https://github.com/apache/kafka/pull/15048#discussion_r1442692990

> Refactor call to storage tool from kafka docker wrapper
> ---
>
> Key: KAFKA-16090
> URL: https://issues.apache.org/jira/browse/KAFKA-16090
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vedarth Sharma
>Assignee: Vedarth Sharma
>Priority: Major
>
> Once rewrite of kafka storage tool is done, refactor how we are calling 
> storage tool from kafka docker wrapper



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16102) about DynamicListenerConfig, the dynamic modification of the listener's port or IP does not take effect.

2024-01-09 Thread Jialun Peng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jialun Peng updated KAFKA-16102:

Reviewer:   (was: Colin Patrick McCabe)

> about DynamicListenerConfig, the dynamic modification of the listener's port 
> or IP does not take effect.
> 
>
> Key: KAFKA-16102
> URL: https://issues.apache.org/jira/browse/KAFKA-16102
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.6.0
> Environment: Must be present in any environment.
>Reporter: Jialun Peng
>Assignee: Jialun Peng
>Priority: Minor
> Fix For: 3.8.0
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> When I dynamically modify the parameters related to Kafka listeners, such as 
> changing the IP or port value of a listener, the dynamic parameters under the 
> corresponding path in ZooKeeper are updated. However, in reality, the 
> modification of the IP or port for the corresponding listener does not take 
> effect. This phenomenon consistently occurs. And there is a slight 
> improvement as the error "Security protocol cannot be updated for existing 
> listener" will be eliminated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15735) KRaft support in SaslMultiMechanismConsumerTest

2024-01-09 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-15735.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> KRaft support in SaslMultiMechanismConsumerTest
> ---
>
> Key: KAFKA-15735
> URL: https://issues.apache.org/jira/browse/KAFKA-15735
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Sanskar Jhajharia
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
> Fix For: 3.8.0
>
>
> The following tests in SaslMultiMechanismConsumerTest in 
> core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
>  need to be updated to support KRaft
> 45 : def testMultipleBrokerMechanisms(): Unit = {
> Scanned 94 lines. Found 0 KRaft tests out of 1 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [KAFKA-15735] Adding KRaft test [kafka]

2024-01-09 Thread via GitHub


omkreddy merged PR #15156:
URL: https://github.com/apache/kafka/pull/15156


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]

2024-01-09 Thread via GitHub


mjsax commented on PR #15151:
URL: https://github.com/apache/kafka/pull/15151#issuecomment-1884303284

   Merged #15157 and rebased this PR afterwards.
   
   Re-triggered system test run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6025/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: bump dev version for system tests to 3.8 [kafka]

2024-01-09 Thread via GitHub


mjsax merged PR #15157:
URL: https://github.com/apache/kafka/pull/15157


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14517:Implement regex subscriptions [kafka]

2024-01-09 Thread via GitHub


JimmyWang6 commented on PR #14327:
URL: https://github.com/apache/kafka/pull/14327#issuecomment-1884289264

   @dajac 
   I've updated the PR. Please take a look when you have a moment. Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16071) NPE in testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress

2024-01-09 Thread Owen C.H. Leung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804981#comment-17804981
 ] 

Owen C.H. Leung commented on KAFKA-16071:
-

I think this is related to https://issues.apache.org/jira/browse/KAFKA-15140. 
I've created a PR to make `TopicCommandIntegrationTest` less flaky

 

https://github.com/apache/kafka/pull/14891

> NPE in testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
> 
>
> Key: KAFKA-16071
> URL: https://issues.apache.org/jira/browse/KAFKA-16071
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Priority: Major
>  Labels: newbie, newbie++
>
> Found in the CI build result.
>  
> h3. Error Message
> java.lang.NullPointerException
> h3. Stacktrace
> java.lang.NullPointerException at 
> org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TopicCommandIntegrationTest.java:800)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15095/1/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__zk/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]

2024-01-09 Thread via GitHub


philipnee commented on code in PR #14531:
URL: https://github.com/apache/kafka/pull/14531#discussion_r1446899891


##
clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java:
##
@@ -26,6 +26,15 @@
  */
 public interface Partitioner extends Configurable, Closeable {
 
+/**
+ * Indicate if the given topic is handled.  Returning {@code false} will 
cause the Producer to fallback to default partitioning.
+ *
+ * @param topic The topic name
+ */
+default boolean partitionsTopic(String topic) {
+return true;

Review Comment:
   why is it true by default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]

2024-01-09 Thread via GitHub


philipnee commented on PR #14531:
URL: https://github.com/apache/kafka/pull/14531#issuecomment-1884215706

   @jimbogithub - thanks for the PR, i've got a few questions to clarify:
- `KafkaProducer.partition(...) not throw IllegalArgumentException if the 
Partitioner returns RecordMetadata.UNKNOWN_PARTITION `: I'm not sure if any 
partitioner actually return this UNKNOWN_PARTITION.  it should be returned by 
the producer.partition no?
- to the question above, I think different partitions has different 
implementations of handling missing topics.  but since they all return some 
partition, are the default behaviors insufficient? bottom line is I'm not 100% 
convincing that the fallback to default is actually needed.
- there's no test for the change you made. i think it would be good to add 
some to demonstrate some of the use cases.
- I would also update the PR description.  you can just use whatever 
described in the PR.
   
   @jolshan - if you get time maybe you can review this? it seems like you 
implemented a few partitioner so you are probably the better person to speak 
about this... thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]

2024-01-09 Thread via GitHub


philipnee commented on code in PR #14531:
URL: https://github.com/apache/kafka/pull/14531#discussion_r1446881807


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1368,7 +1368,7 @@ private int partition(ProducerRecord record, byte[] 
serializedKey, byte[]
 if (record.partition() != null)
 return record.partition();
 
-if (partitioner != null) {
+if (partitioner != null && 
partitioner.partitionsTopic(record.topic())) {

Review Comment:
   there's a slight change to the behavior of the current API, could you modify 
the documentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]

2024-01-09 Thread via GitHub


jimbogithub commented on PR #14531:
URL: https://github.com/apache/kafka/pull/14531#issuecomment-1884148153

   This PR is still valid and desired, has no merge conflicts and does build 
despite Jenkins protestations.  I do not have the ability to add Reviewers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]

2024-01-09 Thread via GitHub


github-actions[bot] commented on PR #14531:
URL: https://github.com/apache/kafka/pull/14531#issuecomment-1884136280

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-01-09 Thread via GitHub


DL1231 commented on PR #15067:
URL: https://github.com/apache/kafka/pull/15067#issuecomment-1884122272

   @AndrewJSchofield, I've updated the PR. Please take a look again. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-01-09 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1446824176


##
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##
@@ -113,6 +121,22 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
 val properties = new Properties()
 config.entrySet().forEach(e => properties.setProperty(e.getKey, 
e.getValue))
 ClientMetricsConfigs.validate(resource.name(), properties)
+  case GROUP =>
+validateGroupName(resource.name())
+val properties = new Properties()
+val nullConsumerGroupConfigs = new mutable.ArrayBuffer[String]()
+config.entrySet().forEach(e => {
+  if (e.getValue == null) {
+nullConsumerGroupConfigs += e.getKey
+  } else {
+properties.setProperty(e.getKey, e.getValue)
+  }
+})
+if (nullConsumerGroupConfigs.nonEmpty) {
+  throw new InvalidConfigurationException("Null value not supported 
for consumer group configs: " +

Review Comment:
   Done



##
core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala:
##
@@ -114,6 +114,18 @@ class DynamicConfigPublisher(
 s"${resource.name()} with new configuration: 
${toLoggableProps(resource, props).mkString(",")} " +
 s"in $deltaName", t)
 })
+case GROUP =>
+  // Apply changes to a group's dynamic configuration.
+  
dynamicConfigHandlers.get(ConfigType.GROUP).foreach(consumerGroupConfigHandler 
=>

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-01-09 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1446824091


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -353,7 +358,8 @@ class BrokerServer(
   dynamicConfigHandlers = Map[String, ConfigHandler](
 ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config, 
quotaManagers, None),
 ConfigType.BROKER -> new BrokerConfigHandler(config, quotaManagers),
-ConfigType.CLIENT_METRICS -> new 
ClientMetricsConfigHandler(clientMetricsManager))
+ConfigType.CLIENT_METRICS -> new 
ClientMetricsConfigHandler(clientMetricsManager),
+ConfigType.GROUP -> new 
ConsumerGroupConfigHandler(consumerGroupConfigManager))

Review Comment:
   Done



##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -264,3 +265,12 @@ class ClientMetricsConfigHandler(private val 
clientMetricsManager: ClientMetrics
 clientMetricsManager.updateSubscription(subscriptionGroupId, properties)
   }
 }
+
+/**
+ * The GroupConfigHandler will process individual group config changes in ZK.
+ */
+class ConsumerGroupConfigHandler(private val consumerGroupConfigManager: 
ConsumerGroupConfigManager) extends ConfigHandler with Logging {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-01-09 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1446823998


##
clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java:
##
@@ -223,6 +223,7 @@ public enum ConfigSource {
 DYNAMIC_BROKER_CONFIG,  // dynamic broker config that is 
configured for a specific broker
 DYNAMIC_DEFAULT_BROKER_CONFIG,  // dynamic broker config that is 
configured as default for all brokers in the cluster
 DYNAMIC_CLIENT_METRICS_CONFIG,  // dynamic client metrics subscription 
config that is configured for all clients
+DYNAMIC_CONSUMER_GROUP_CONFIG,  // dynamic consumer group config that 
is configured for a specific consumer group

Review Comment:
   Done



##
clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java:
##
@@ -120,7 +121,8 @@ public enum ConfigSource {
 STATIC_BROKER_CONFIG((byte) 4, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
 DEFAULT_CONFIG((byte) 5, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DEFAULT_CONFIG),
 DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG),
-CLIENT_METRICS_CONFIG((byte) 7, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG);
+CLIENT_METRICS_CONFIG((byte) 7, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG),
+CONSUMER_GROUP_CONFIG((byte) 8, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CONSUMER_GROUP_CONFIG);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-09 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1446822116


##
clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java:
##
@@ -50,6 +50,10 @@ public ListGroupsRequest build(short version) {
 throw new UnsupportedVersionException("The broker only 
supports ListGroups " +
 "v" + version + ", but we need v4 or newer to request 
groups by states.");
 }
+if (!data.typesFilter().isEmpty() && version < 5) {
+throw new UnsupportedVersionException("The broker only 
supports ListGroups " +
+"v" + version + ", but we need v5 or newer to request 
groups by type.");
+}

Review Comment:
   Oh yes! Thanks for noticing this, I missed this test!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]

2024-01-09 Thread via GitHub


dengziming commented on code in PR #14595:
URL: https://github.com/apache/kafka/pull/14595#discussion_r1446812841


##
core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala:
##
@@ -35,31 +37,37 @@ class RackAwareAutoTopicCreationTest extends 
KafkaServerTestHarness with RackAwa
   overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
   overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 
replicationFactor.toString)
 
+
   def generateConfigs =
 (0 until numServers) map { node =>
-  TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = 
false, rack = Some((node / 2).toString))
+  TestUtils.createBrokerConfig(node, zkConnectOrNull, 
enableControlledShutdown = false, rack = Some((node / 2).toString))
 } map (KafkaConfig.fromProps(_, overridingProps))
 
   private val topic = "topic"
 
-  @Test
-  def testAutoCreateTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // TODO Partition leader is not evenly 
distributed in kraft mode, see KAFKA-15354

Review Comment:
   I tried to fix it but we are using a different way to assign leader in kraft 
mode, the ideas behind the two algorithms are similar but the implementations 
are different. 
   And the new one, `StripedReplicaPlacer`, is clearer, but it only ensure 
leader are distributed evenly across racks, and don't ensure leaders are evenly 
distributed across nodes. I wonder whether it is worth evolving it to be 
consistent with the older one. cc @cmccabe 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-09 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1446780228


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+List statesFilter,
+List typesFilter,
+long committedOffset
+) {
+Predicate combinedFilter = group -> {
+boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+boolean typeCheck = typesFilter.isEmpty() || 
typesFilter.contains(group.type().toString());
+return stateCheck && typeCheck;
+};
 
-public List listGroups(List 
statesFilter, long committedOffset) {
-Stream groupStream = groups.values(committedOffset).stream();
-if (!statesFilter.isEmpty()) {
-groupStream = groupStream.filter(group -> 
statesFilter.contains(group.stateAsString(committedOffset)));
-}
-return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
+Stream groupStream = 
groups.values(committedOffset).parallelStream();

Review Comment:
   thanks for the comment! okay I can make the change :) My understanding was 
that we want to scale to a large number of groups so I was trying to optimize 
wherever I could!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1446778536


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   Also these are timeline data structures but are there cases when we roll it 
back?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1446774595


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -668,6 +697,8 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchOffsets(
 OffsetFetchRequestData.OffsetFetchRequestGroup request,
 long lastCommittedOffset
 ) throws ApiException {
+final boolean requireStable = lastCommittedOffset == Long.MAX_VALUE;

Review Comment:
   is it the case that when we query stable offsets we always have the long max 
value for lastCommittedOffset? Or in other words, we can't query a specific 
offset when we require stable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1446773736


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   just for my understanding though -- for every producer ID for a group we 
return, we will have an offset in pendingTransactionalOffsets, and every 
producer ID in pendingTransactionalOffsets will have its group in 
openTransactionsByGroup.
   
   I can't imagine we would have a case where something is in one map but not 
the other. (I can imagine that the group is not there at all or the topic 
partition is not there though)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-09 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1446773716


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -633,8 +633,8 @@ class KafkaApisTest extends Logging {
 val requestData = 
DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)

Review Comment:
   yep sorry about that, when I rebased and transferred the files  I might've 
messed up
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-09 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1446771432


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -327,23 +327,26 @@ class GroupCoordinatorAdapterTest {
 
   @Test
   def testListGroups(): Unit = {
-testListGroups(null, Set.empty)
-testListGroups(List(), Set.empty)
-testListGroups(List("Stable"), Set("Stable"))
+testListGroups(null, null, Set.empty, Set.empty)
+testListGroups(List(), List(), Set.empty, Set.empty)
+testListGroups(List("Stable, Empty"), List(), Set("Stable, Empty"), 
Set.empty)

Review Comment:
   Yep thanks for mentioning this, let me add it in, at the time I didn't think 
it was necessary but I can add it now
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-09 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1446770934


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -327,23 +327,26 @@ class GroupCoordinatorAdapterTest {
 
   @Test
   def testListGroups(): Unit = {
-testListGroups(null, Set.empty)
-testListGroups(List(), Set.empty)
-testListGroups(List("Stable"), Set("Stable"))
+testListGroups(null, null, Set.empty, Set.empty)
+testListGroups(List(), List(), Set.empty, Set.empty)
+testListGroups(List("Stable, Empty"), List(), Set("Stable, Empty"), 
Set.empty)

Review Comment:
   Yep thanks for bringing it up, we could add it, I considered it but it 
didn't seem to add much value to the test so I decided against it at the time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1446770811


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -656,6 +663,28 @@ public int deleteAllOffsets(
 return numDeletedOffsets.get();
 }
 
+/**
+ * @return true iif there is at least one pending transactional offsets 
for the given

Review Comment:
   nit: iff, offset
   (unless iif is a different acronym and is not if and only if)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-09 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1446770934


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -327,23 +327,26 @@ class GroupCoordinatorAdapterTest {
 
   @Test
   def testListGroups(): Unit = {
-testListGroups(null, Set.empty)
-testListGroups(List(), Set.empty)
-testListGroups(List("Stable"), Set("Stable"))
+testListGroups(null, null, Set.empty, Set.empty)
+testListGroups(List(), List(), Set.empty, Set.empty)
+testListGroups(List("Stable, Empty"), List(), Set("Stable, Empty"), 
Set.empty)

Review Comment:
   Yep thanks for bringing it up, we could add it, I considered it but it 
didn't seem to add much value to the test so I decided against it at the time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1446770811


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -656,6 +663,28 @@ public int deleteAllOffsets(
 return numDeletedOffsets.get();
 }
 
+/**
+ * @return true iif there is at least one pending transactional offsets 
for the given

Review Comment:
   nit: offset



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-09 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1446767928


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -1105,16 +1105,17 @@ private[group] class GroupCoordinator(
 }
   }
 
-  def handleListGroups(states: Set[String]): (Errors, List[GroupOverview]) = {
+  def handleListGroups(states: Set[String], groupTypes: Set[String]): (Errors, 
List[GroupOverview]) = {
 if (!isActive.get) {
   (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
 } else {
   val errorCode = if (groupManager.isLoading) 
Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
-  // if states is empty, return all groups
-  val groups = if (states.isEmpty)
-groupManager.currentGroups
-  else
-groupManager.currentGroups.filter(g => 
states.contains(g.summary.state))
+  // Filter groups based on states and groupTypes. If either is empty, it 
won't filter on that criterion.
+  // If groupType is mentioned then no group is returned since the notion 
of groupTypes doesn't exist in the
+  // old group coordinator.
+  val groups = groupManager.currentGroups.filter { g =>
+(states.isEmpty || states.contains(g.summary.state)) && 
groupTypes.isEmpty
+  }

Review Comment:
   Thanks that makes sense, I'll make the changes, was waiting on your comments 
on my design doc to make the final call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-09 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1446766566


##
clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java:
##
@@ -0,0 +1,50 @@
+/*

Review Comment:
   Yes it is, we can consider it part of the admin client PR but there was a 
dependency in this one too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2024-01-09 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804922#comment-17804922
 ] 

Kirk True commented on KAFKA-15475:
---

[~lianetm] would you kindly point me at the code in the two {{RequestManager}} 
implementations that have solved this?

> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2024-01-09 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804921#comment-17804921
 ] 

Kirk True commented on KAFKA-15475:
---

[~lianetm] / [~pnee] —I need to refresh my memory about what it means for an 
exception to be retriable. Does it mean that the operation is automatically 
retried at some layer of the client, or does it simply mean that it's a 
transient failure that the caller _could_ retry, if desired?

> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1446756748


##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -27,21 +27,24 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   Sorry for confusion. I've pushed the latest commit with all the renames.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15181: Improvements for TopicBaseRemoteLogMetadataManager [kafka]

2024-01-09 Thread via GitHub


junrao commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1446751658


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +63,403 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
-
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+// The timeout for the consumer to poll records from the remote log 
metadata topic.
+private final long pollTimeoutMs;
 private final Time time;
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+// The initial value is set to true to wait for partition assignment on 
the first execution; otherwise thread will
+// be busy without actually doing anything
+private volatile boolean hasAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt;
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;

Review Comment:
   @abhijeetk88 @satishd : Sorry to chime in late on this. It seems that we 
removed the usage of `CommittedOffsetsFile`.
   1. Does the consumer always start from the beginning offset after restart?
   2. Should we remove the CommittedOffsetsFile class?
   3. This actually changes the on-disk layout. Was this discussed in a KIP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16094) BrokerRegistrationRequest.logDirs field must be ignorable

2024-01-09 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-16094.
--
Fix Version/s: 3.7.0
   Resolution: Fixed

> BrokerRegistrationRequest.logDirs field must be ignorable
> -
>
> Key: KAFKA-16094
> URL: https://issues.apache.org/jira/browse/KAFKA-16094
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.7.0
>
>
> 3.7 brokers must be able to register with 3.6 and earlier controllers. So 
> this means that the logDirs field must be ignorable (aka, not sent) if the 
> highest BrokerRegistrationRequest version we can negotiate is older than v2.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1446743080


##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -27,21 +27,24 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   Looks like I missed some files though so I will fix that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1446742642


##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -27,21 +27,24 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   I've updated the PR to use the names you suggested earlier today :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-09 Thread via GitHub


artemlivshits commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1446724899


##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -27,21 +27,24 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   We can add comments to the fields as well.  E.g. the fact that 
`electedLeaders` is a strict subset of `updatedLeaders` (or whatever name we 
end up choosing for it) is something that I think is instrumental in 
understanding why we have 2 sets of leaders.
   
   > I really struggled with the name "leaders"
   
   That's why I said comments ... 
   
   Btw, I'm not sure that updatedLeaders provides better hint on what's going 
on here.  Only electedLeaders has really the leaders that are updated (newly 
elected), the updatedLeaders contains all partitions that had changes somewhere 
(followers etc.) that this broker is a leader for.  Same thing for the 
followers -- all changes that this broker is a follower for, not necessarily 
related to this broker per se.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-09 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1446722719


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName
+  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+Option(image.topics().getTopic(topicName)) match {
+  case None => None
+  case Some(topic) => {
+val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
+  val partitionId = entry.getKey
+  val partition = entry.getValue
+  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+listenerName, false)
+  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
+false)
+  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+  maybeLeader match {
+case None =>
+  val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {

Review Comment:
   Good question, actually I am not super sure about these errors. They may 
have a history as they are included in the getPartitionMetadata method. @mumrah 
Do you have an idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-09 Thread via GitHub


artemlivshits commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1446639923


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName
+  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+Option(image.topics().getTopic(topicName)) match {
+  case None => None
+  case Some(topic) => {
+val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
+  val partitionId = entry.getKey
+  val partition = entry.getValue
+  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+listenerName, false)
+  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
+false)
+  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+  maybeLeader match {
+case None =>
+  val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {
+debug(s"Error while fetching metadata for 
$topicName-$partitionId: leader not available")
+Errors.LEADER_NOT_AVAILABLE
+  } else {
+debug(s"Error while fetching metadata for 
$topicName-$partitionId: listener $listenerName " +
+  s"not found on leader ${partition.leader}")
+Errors.LISTENER_NOT_FOUND
+  }
+  new DescribeTopicPartitionsResponsePartition()
+.setErrorCode(error.code)
+.setPartitionIndex(partitionId)
+.setLeaderId(MetadataResponse.NO_LEADER_ID)
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+case Some(leader) =>
+  val error = if (filteredReplicas.size < 
partition.replicas.length) {
+debug(s"Error while fetching metadata for 
$topicName-$partitionId: replica information not available for " +
+  s"following brokers 
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
+Errors.REPLICA_NOT_AVAILABLE

Review Comment:
   Not quite sure why this is an error.  We should just describe the state -- 
assigned replicas, active replicas, replicas in ISR, replicas in ELR, etc.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {
+metadataCache match {
+  case _: ZkMetadataCache =>
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request")
+  case _ =>
+}
+val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+val describeTopicPartitionsRequest = 
request.body[DescribeTopicPartitionsRequest].data()
+var topics = scala.collection.mutable.Set[String]()
+describeTopicPartitionsRequest.topics().forEach(topic => 
topics.add(topic.name()))
+
+val cursor = describeTopicPartitionsRequest.cursor()
+val fetchAllTopics = topics.isEmpty
+if (fetchAllTopics) {
+  metadataCache.getAllTopics().foreach(topic => topics.add(topic))

Review Comment:
   Looks like we copy topics multiple times.  I think we can (a) filter out 
topic that are below cursor before copying, (b) use sorted set so that we can 
build the desired data structure once.



##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName
+  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+Option(image.topics().getTopic(topicName)) match {
+  case None => None
+  case Some(topic) => {
+val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
+  val partitionId = entry.getKey
+  val partition = entry.getValue
+  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+listenerName, false)
+  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
+false)
+  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+  val maybeLeader = g

Re: [PR] KAFKA-16094: BrokerRegistrationRequest.logDirs field must be ignorable [kafka]

2024-01-09 Thread via GitHub


cmccabe merged PR #15153:
URL: https://github.com/apache/kafka/pull/15153


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446688518


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Yes -- that part has not changed. We "wrap" any callback from the 
transaction coordinator to the request handler thread.
   
   Right now though, wrap only schedules to the request thread if we are 
already on a request thread. Otherwise we execute directly. If we start 
verification from a non-request handler thread, maybe this already works as you 
intend.
   
   Alternatively, I could pass in a parameter to optionally wrap the callback 
(send it to the request thread) or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-09 Thread via GitHub


OmniaGM commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1446673095


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   I feel the same but also not sure where they should move. They don't fit in 
server module either. I don't see any Jiras to move transaction coordinator out 
of server but maybe I can start a new module for transaction coordinator 
similar to the group one. Would this make more sense? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-09 Thread via GitHub


dajac commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1446669238


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   Hey @OmniaGM. It is a bit weird to have those transaction classes in the 
group-coordinator module. It does not seem to be the correct place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446663978


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   btw, when the validation completed, we schedule an event on the 
coordinator’s thread pool so we don’t really need to execute it on the request 
handler thread. with your refactor, do you plan to keep the re-scheduling 
within the replica manager?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446657520


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Thanks! I will try to run through it today too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446654869


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   ah, right. i forgot about that one. let me check this tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15946) AsyncKafkaConsumer should retry commits on the application thread instead of auto-retry

2024-01-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-15946.

Fix Version/s: 3.7.0
   (was: 3.8.0)
 Assignee: Lianet Magrans  (was: Kirk True)
   Resolution: Fixed

3.7 includes fix to make sure that only sync commits are retried, with a 
timeout, and async commits are not (just passing failure to the callback). 
There is also a follow ticket https://issues.apache.org/jira/browse/KAFKA-16033

> AsyncKafkaConsumer should retry commits on the application thread instead of 
> auto-retry
> ---
>
> Key: KAFKA-15946
> URL: https://issues.apache.org/jira/browse/KAFKA-15946
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.7.0
>
>
> The original design was that the network thread always completes the future 
> whether succeeds or fails.  However, in the current patch, I mis-added 
> auto-retry functionality because commitSync wasn't retrying.  What we should 
> be doing is, the commit sync API should catch the RetriableExceptions and 
> resend another commit until timesout.
>  
> {code:java}
> if (error.exception() instanceof RetriableException) {
> log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, 
> error.message());
> handleRetriableError(error, response);
> retry(responseTime);  <--- We probably shouldn't do this.
> return;
> } {code}
>  
> {code:java}
> @Override
> public void commitSync(Map offsets, 
> Duration timeout) {
> acquireAndEnsureOpen();
> long commitStart = time.nanoseconds();
> try
> { CompletableFuture commitFuture = commit(offsets, true); <-- we 
> probably should retry here ConsumerUtils.getResult(commitFuture, 
> time.timer(timeout)); }
> finally
> { wakeupTrigger.clearTask(); 
> kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); 
> release(); }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15967) Fix revocation in reconcilation logic

2024-01-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-15967.

Fix Version/s: 3.7.0
   (was: 3.8.0)
 Assignee: Lianet Magrans
   Resolution: Fixed

> Fix revocation in reconcilation logic
> -
>
> Key: KAFKA-15967
> URL: https://issues.apache.org/jira/browse/KAFKA-15967
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.7.0
>
>
> Looks like there is a problem in the reconciliation logic.
> We are getting 6 partitions from an HB, we add them to 
> {{{}assignmentReadyToReconcile{}}}. Next HB we get only 4 partitions (2 are 
> revoked), we also add them to {{{}assignmentReadyToReconcile{}}}, but the 2 
> partitions that were supposed to be removed from the assignment are never 
> removed because they are still in {{{}assignmentReadyToReconcile{}}}.
> This was discovered during integration testing of 
> [https://github.com/apache/kafka/pull/14878] - part of the test 
> testRemoteAssignorRange was disabled and should be re-enabled once this is 
> fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15553) Review consumer positions update

2024-01-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15553:
---
Summary: Review consumer positions update  (was: Review consumer positions 
update using committed offset)

> Review consumer positions update
> 
>
> Key: KAFKA-15553
> URL: https://issues.apache.org/jira/browse/KAFKA-15553
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> From the existing comment: If there are any partitions which do not have a 
> valid position and are not awaiting reset, then we need to fetch committed 
> offsets.
> In the async consumer: I wonder if it would make sense to refresh the 
> position on the event loop continuously.
> The logic to refresh offsets in the poll loop is quite fragile and works 
> largely by side-effects of the code that it calls. For example, the behaviour 
> of the "cached" value is really not that straightforward and simply reading 
> the cached value is not sufficient to start consuming data in all cases.
> This area needs a bit of a refactor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15553) Review consumer positions update using committed offset

2024-01-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15553:
---
Summary: Review consumer positions update using committed offset  (was: 
Review committed offset refresh logic)

> Review consumer positions update using committed offset
> ---
>
> Key: KAFKA-15553
> URL: https://issues.apache.org/jira/browse/KAFKA-15553
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> From the existing comment: If there are any partitions which do not have a 
> valid position and are not awaiting reset, then we need to fetch committed 
> offsets.
> In the async consumer: I wonder if it would make sense to refresh the 
> position on the event loop continuously.
> The logic to refresh offsets in the poll loop is quite fragile and works 
> largely by side-effects of the code that it calls. For example, the behaviour 
> of the "cached" value is really not that straightforward and simply reading 
> the cached value is not sufficient to start consuming data in all cases.
> This area needs a bit of a refactor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16004) Review new consumer inflight offset commit logic

2024-01-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16004:
---
Description: New consumer logic for committing offsets handles inflight 
requests, to validate that no commit requests are sent if a previous one hasn't 
received a response. Review how that logic is currently applied to both, sync 
and async commits and validate against the legacy coordinator, who seems to 
apply it only for async commits. Review considering behaviour for auto-commits 
too.   (was: New consumer logic for committing offsets handles inflight 
requests, to validate that no commit requests are sent if a previous one hasn't 
received a response. Review how that logic is currently applied to both, sync 
and async commits and validate against the legacy coordinator, who seems to 
apply it only for async commits.)

> Review new consumer inflight offset commit logic
> 
>
> Key: KAFKA-16004
> URL: https://issues.apache.org/jira/browse/KAFKA-16004
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer logic for committing offsets handles inflight requests, to 
> validate that no commit requests are sent if a previous one hasn't received a 
> response. Review how that logic is currently applied to both, sync and async 
> commits and validate against the legacy coordinator, who seems to apply it 
> only for async commits. Review considering behaviour for auto-commits too. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Metadata schema checker [kafka]

2024-01-09 Thread via GitHub


mannoopj commented on code in PR #14389:
URL: https://github.com/apache/kafka/pull/14389#discussion_r1446640522


##
tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.SchemaChecker;
+
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.eclipse.jgit.api.CheckoutCommand;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.InitCommand;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.internal.storage.file.FileRepository;
+import org.eclipse.jgit.lib.*;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.storage.file.FileRepositoryBuilder;
+import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.eclipse.jgit.treewalk.filter.PathFilter;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+
+public class MetadataSchemaChecker {
+
+static int latestTag = -1;
+static int  latestTagVersion = -1;
+static int oldLatestVersion = -1;
+static int oldFirstVersion = -1;
+static int newLatestVersion = -1;
+static int newFirstVersion = -1;
+
+static String[] filesCheckMetadata = {"AccessControlEntryRecord.json", 
"BrokerRegistrationChangeRecord.json", "ClientQuotaRecord.json",
+"ConfigRecord.json", "DelegationTokenRecord.json", 
"FeatureLevelRecord.json", "FenceBrokerRecord.json", "NoOpRecord.json",
+"PartitionChangeRecord.json", "PartitionRecord.json", 
"ProducerIdsRecord.json", "RegisterBrokerRecord.json",
+"RemoveAccessControlEntryRecord.json", "RemoveTopicRecord.json", 
"RemoveUserScramCredentialRecord.json", "TopicRecord.json",
+"UnfenceBrokerRecord.json", "UnregisterBrokerRecord.json", 
"UserScramCredentialRecord.json", "ZkMigrationRecord.json"};
+public static void main(String[] args) throws Exception {
+
+try {
+List localContent = new ArrayList<>();
+for(String jsonSchema: filesCheckMetadata) {
+final String dir = System.getProperty("user.dir");
+String path = dir + 
"/metadata/src/main/resources/common/metadata/" + jsonSchema;
+BufferedReader reader = new BufferedReader(new 
FileReader(path));
+for (int i = 0; i < 15; i++) {

Review Comment:
   Addressed in a similar comment below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15872) Investigate autocommit retry logic

2024-01-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-15872.

Fix Version/s: (was: 3.8.0)
   Resolution: Duplicate

> Investigate autocommit retry logic
> --
>
> Key: KAFKA-15872
> URL: https://issues.apache.org/jira/browse/KAFKA-15872
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> This is purely an investigation ticket.
> Currently, we send an autocommit only if there isn't an inflight one; 
> however, this logic might not be correct because I think we should:
>  # expires the request if it is not completed in time
>  # always send an autocommit on the clock



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


philipnee commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446637026


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,40 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);

Review Comment:
   thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15455) Add support for OffsetCommit version 9 in consumer

2024-01-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-15455.

Fix Version/s: 3.7.0
   (was: 3.8.0)
   Resolution: Fixed

> Add support for OffsetCommit version 9 in consumer
> --
>
> Key: KAFKA-15455
> URL: https://issues.apache.org/jira/browse/KAFKA-15455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> We need to handle the new error codes as specified here:
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitResponse.json#L46|https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitRequest.json#L35]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446625255


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,40 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);

Review Comment:
   Thanks for looking at PR Philip. Never for Java client as we support all 
compression types in java client. Below is what KIP says
   
   `
   The broker will return a prioritized list of supported compression types in 
the GetTelemetrySubscriptionsResponse.AcceptedCompressionTypes array, the 
client is free to pick any supported compression type but should pick the first 
mutually supported type in the returned list. If the AcceptedCompressionTypes 
array is empty the client must send metrics uncompressed. The default 
compression types list as returned from the broker should be: ZStd, LZ4, GZip, 
Snappy.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


philipnee commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446620286


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,40 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);

Review Comment:
   out of curiousity - when would we not use the first one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-09 Thread via GitHub


apoorvmittal10 commented on PR #15148:
URL: https://github.com/apache/kafka/pull/15148#issuecomment-1883799949

   Build passed on all environments with unrelated tests failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-09 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804880#comment-17804880
 ] 

Proven Provenzano commented on KAFKA-16082:
---

[~gnarula] added an improvement for the handling of case 3 above:[ 
https://github.com/apache/kafka/pull/15136|https://github.com/apache/kafka/pull/15136]
 

 

> JBOD: Possible dataloss when moving leader partition
> 
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-09 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804879#comment-17804879
 ] 

Proven Provenzano edited comment on KAFKA-16082 at 1/9/24 8:49 PM:
---

For the case of 3:
 
If I understand this correctly, the scenario is that the broker restarts and 
sees that `dir2` is supposed to own `tp0` from the metadata log replay, however 
it doesn't see the log in `dir2` because the failed future replica hasn't been 
renamed and so it will create a new replica for `tp0` in `dir2` and populate it 
with data from other replicas. Can we create a unit test to validate this? It 
may also be possible to reuse the current future replica so long as the broker 
at restart went through a stage where the leader of the partition was moved to 
a different broker. Now it can treat the partition as an out of sync replica 
and do the rename and catch up immediately. Note it cannot do the rename until 
after the partition leadership has been moved away from the broker in case the 
broker again restarts.

 

 


was (Author: JIRAUSER298332):
For the case of 3:
 
If I understand this correctly, the scenario is that the broker restarts and 
sees that `dir2` is supposed to own `tp0` from the metadata log replay, however 
it doesn't see the log in `dir2` because the failed future replica hasn't been 
renamed and so it will create a new replica for `tp0` in `dir2` and populate it 
with data from other replicas. Can we create a unit test to validate this? It 
may also be possible to reuse the current future replica so long as the broker 
at restart went through a stage where the leader of the partition was moved to 
a different broker. Now it can treat the partition as an out of sync replica 
and do the rename and catch up immediately. Note it cannot do the rename until 
after the partition leadership has been moved away from the broker in case the 
broker again restarts.
{quote} {quote}

> JBOD: Possible dataloss when moving leader partition
> 
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add reviewers GitHub action [kafka]

2024-01-09 Thread via GitHub


mumrah commented on PR #15115:
URL: https://github.com/apache/kafka/pull/15115#issuecomment-1883771689

   > Although that way we won't be able to merge it via browser
   
   That's a non-starter IMO. 
   
   I wonder if we could write our own bot for these kinds of automations. 
https://probot.github.io/ looks interesting.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-09 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804879#comment-17804879
 ] 

Proven Provenzano commented on KAFKA-16082:
---

For the case of 3:
 
If I understand this correctly, the scenario is that the broker restarts and 
sees that `dir2` is supposed to own `tp0` from the metadata log replay, however 
it doesn't see the log in `dir2` because the failed future replica hasn't been 
renamed and so it will create a new replica for `tp0` in `dir2` and populate it 
with data from other replicas. Can we create a unit test to validate this? It 
may also be possible to reuse the current future replica so long as the broker 
at restart went through a stage where the leader of the partition was moved to 
a different broker. Now it can treat the partition as an out of sync replica 
and do the rename and catch up immediately. Note it cannot do the rename until 
after the partition leadership has been moved away from the broker in case the 
broker again restarts.
{quote} {quote}

> JBOD: Possible dataloss when moving leader partition
> 
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15853: Move AuthorizerUtils and its dependencies to server module [kafka]

2024-01-09 Thread via GitHub


OmniaGM opened a new pull request, #15167:
URL: https://github.com/apache/kafka/pull/15167

   Blocker for #15103 - Moving AuthorizerUtils and Session into server module
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]

2024-01-09 Thread via GitHub


mimaison commented on code in PR #14595:
URL: https://github.com/apache/kafka/pull/14595#discussion_r1446558104


##
core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala:
##
@@ -35,31 +37,37 @@ class RackAwareAutoTopicCreationTest extends 
KafkaServerTestHarness with RackAwa
   overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
   overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 
replicationFactor.toString)
 
+
   def generateConfigs =
 (0 until numServers) map { node =>
-  TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = 
false, rack = Some((node / 2).toString))
+  TestUtils.createBrokerConfig(node, zkConnectOrNull, 
enableControlledShutdown = false, rack = Some((node / 2).toString))
 } map (KafkaConfig.fromProps(_, overridingProps))
 
   private val topic = "topic"
 
-  @Test
-  def testAutoCreateTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // TODO Partition leader is not evenly 
distributed in kraft mode, see KAFKA-15354

Review Comment:
   Do we need to fix KAFKA-15354 first? Or can this ticket be resolved?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446538966


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java:
##
@@ -116,4 +120,21 @@ long appendEndTransactionMarker(
 int coordinatorEpoch,
 TransactionResult result
 ) throws KafkaException;
+
+/**
+ * Verify the transaction.
+ *
+ * @param tpThe partition to write records to.
+ * @param transactionalId   The transactional id.
+ * @param producerIdThe producer id.
+ * @param producerEpoch The producer epoch.
+ * @return A future containing the {@link VerificationGuard} or an 
exception.

Review Comment:
   There is one case where I think we still throw an error even if the 
partition is already verified. When we look up the partition to see if it needs 
verification, we could throw an error if the partition isn't on the broker. Not 
a huge deal though and we probably don't need to include.
   
   But an alternate description could be something like it returns any error 
encountered or the verification guard if it needed verification and the 
sentinel if it did not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1446536290


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Right right, I remember this now. 
   I'm wondering though is that ok with the async verification callback. Is it 
the case we don't evaluate which one to use until we execute the callback.
   
   As an aside, did we decide that the callback is ok to execute on the request 
handler thread? I didn't go through and trace the threads being used yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16106) group size counters do not reflect the actual sizes when operations fail

2024-01-09 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-16106:


 Summary: group size counters do not reflect the actual sizes when 
operations fail
 Key: KAFKA-16106
 URL: https://issues.apache.org/jira/browse/KAFKA-16106
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim
Assignee: Jeff Kim


An expire-group-metadata operation generates tombstone records, updates the 
`groups` state and decrements group size counters, then performs a write to the 
log. If there is a __consumer_offsets partition reassignment, this operation 
fails. The `groups` state is reverted to an earlier snapshot but classic group 
size counters are not. This begins an inconsistency between the metrics and the 
actual groups size. This applies to all unsuccessful write operations that 
alter the `groups` state.

 

The issue is exacerbated because the expire group metadata operation is retried 
possibly indefinitely.

 

The solution to this is to make the counters also a timeline data structure 
(TimelineLong) so that in the event of a failed write operation we revert the 
counters as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents

2024-01-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Fix Version/s: 3.8.0

> Consistent handling of timeouts and responses for new consumer 
> ApplicationEvents
> 
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Andrew Schofield
>Priority: Major
> Fix For: 3.8.0
>
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents

2024-01-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Labels: consumer-threading-refactor  (was: )

> Consistent handling of timeouts and responses for new consumer 
> ApplicationEvents
> 
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Andrew Schofield
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer

2024-01-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16104:
--
Fix Version/s: 3.8.0

> Enable additional PlaintextConsumerTest tests for new consumer
> --
>
> Key: KAFKA-16104
> URL: https://issues.apache.org/jira/browse/KAFKA-16104
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Andrew Schofield
>Priority: Minor
> Fix For: 3.8.0
>
>
> It should be possible to enable:
> * testAutoCommitOnClose
> * testAutoCommitOnCloseAfterWakeup
> * testExpandingTopicSubscriptions
> * testShrinkingTopicSubscriptions
> * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed)
> * testAutoCommitOnRebalance
> * testPerPartitionLeadMetricsCleanUpWithSubscribe
> * testPerPartitionLagMetricsCleanUpWithSubscribe
> * testStaticConsumerDetectsNewPartitionCreatedAfterRestart



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer

2024-01-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16104:
--
Labels: consumer-threading-refactor  (was: )

> Enable additional PlaintextConsumerTest tests for new consumer
> --
>
> Key: KAFKA-16104
> URL: https://issues.apache.org/jira/browse/KAFKA-16104
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Andrew Schofield
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> It should be possible to enable:
> * testAutoCommitOnClose
> * testAutoCommitOnCloseAfterWakeup
> * testExpandingTopicSubscriptions
> * testShrinkingTopicSubscriptions
> * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed)
> * testAutoCommitOnRebalance
> * testPerPartitionLeadMetricsCleanUpWithSubscribe
> * testPerPartitionLagMetricsCleanUpWithSubscribe
> * testStaticConsumerDetectsNewPartitionCreatedAfterRestart



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]

2024-01-09 Thread via GitHub


jolshan commented on PR #15130:
URL: https://github.com/apache/kafka/pull/15130#issuecomment-1883672884

   ^ Those are issues I see frequently and are likely unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId

2024-01-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15557:
--
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in 
> assignFromUserNoId
> ---
>
> Key: KAFKA-15557
> URL: https://issues.apache.org/jira/browse/KAFKA-15557
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods 
> named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to 
> perform duplicate metadata updates:
> {code:java}
> private void assignFromUser(Set partitions) {
> subscriptions.assignFromUser(partitions);
> client.updateMetadata(initialUpdateResponse);
> // A dummy metadata update to ensure valid leader epoch.
> metadata.updateWithCurrentRequestVersion(
> RequestTestUtils.metadataUpdateWithIds(
> "dummy",
> 1, 
> Collections.emptyMap(),
> singletonMap(topicName, 4),
> tp -> validLeaderEpoch, topicIds
> ),
> false,
> 0L
> );
> }
> {code}
> {{client.updateMetadata()}} eventually calls 
> {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is 
> updating the cluster metadata twice with different values.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15250) DefaultBackgroundThread is running tight loop

2024-01-09 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804861#comment-17804861
 ] 

Kirk True commented on KAFKA-15250:
---

This is still an issue. If you enable detailed logging, it writes thousands of 
lines of logging within seconds.

> DefaultBackgroundThread is running tight loop
> -
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-15250) DefaultBackgroundThread is running tight loop

2024-01-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reopened KAFKA-15250:
---
  Assignee: Kirk True  (was: Philip Nee)

> DefaultBackgroundThread is running tight loop
> -
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15250) DefaultBackgroundThread is running tight loop

2024-01-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15250:
--
Fix Version/s: 3.8.0

> DefaultBackgroundThread is running tight loop
> -
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest

2024-01-09 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy resolved KAFKA-15941.

Resolution: Cannot Reproduce

> Flaky test: shouldRestoreNullRecord() – 
> org.apache.kafka.streams.integration.RestoreIntegrationTest
> ---
>
> Key: KAFKA-15941
> URL: https://issues.apache.org/jira/browse/KAFKA-15941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: flaky-test
>
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output 
> (got []) ==> expected:  but was: 
> Stacktraceorg.opentest4j.AssertionFailedError: Condition not met 
> within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records 
> from topic output (got []) ==> expected:  but was:   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)  at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) 
>at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) 
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790)
>  at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest

2024-01-09 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy closed KAFKA-15941.
--
Assignee: Lucas Brutschy

> Flaky test: shouldRestoreNullRecord() – 
> org.apache.kafka.streams.integration.RestoreIntegrationTest
> ---
>
> Key: KAFKA-15941
> URL: https://issues.apache.org/jira/browse/KAFKA-15941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: flaky-test
>
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output 
> (got []) ==> expected:  but was: 
> Stacktraceorg.opentest4j.AssertionFailedError: Condition not met 
> within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records 
> from topic output (got []) ==> expected:  but was:   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)  at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) 
>at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) 
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790)
>  at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest

2024-01-09 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804856#comment-17804856
 ] 

Lucas Brutschy commented on KAFKA-15941:


Test hasn't failed in the last month so I'm closing this

 

https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FBerlin&search.values=trunk&tests.container=org.apache.kafka.streams.integration.RestoreIntegrationTest&tests.sortField=FLAKY&tests.test=shouldRestoreNullRecord()

> Flaky test: shouldRestoreNullRecord() – 
> org.apache.kafka.streams.integration.RestoreIntegrationTest
> ---
>
> Key: KAFKA-15941
> URL: https://issues.apache.org/jira/browse/KAFKA-15941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: flaky-test
>
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output 
> (got []) ==> expected:  but was: 
> Stacktraceorg.opentest4j.AssertionFailedError: Condition not met 
> within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records 
> from topic output (got []) ==> expected:  but was:   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)  at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) 
>at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) 
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790)
>  at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]

2024-01-09 Thread via GitHub


C0urante commented on code in PR #15149:
URL: https://github.com/apache/kafka/pull/15149#discussion_r1446506255


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java:
##
@@ -226,20 +235,20 @@ protected final void initializeResources() {
 if (adminListeners == null) {
 log.info("Adding admin resources to main listener");
 adminResourceConfig = resourceConfig;
-Collection adminResources = adminResources();
-resources.addAll(adminResources);
+Collection> adminResources = adminResources();
 adminResources.forEach(adminResourceConfig::register);
 configureAdminResources(adminResourceConfig);
 } else if (adminListeners.size() > 0) {
 // TODO: we need to check if these listeners are same as 
'listeners'
 // TODO: the following code assumes that they are different
 log.info("Adding admin resources to admin listener");
 adminResourceConfig = new ResourceConfig();
+adminResourceConfig.register(requestTimeout.binder());
 adminResourceConfig.register(new JacksonJsonProvider());
-Collection adminResources = adminResources();
-resources.addAll(adminResources);
+Collection> adminResources = adminResources();
 adminResources.forEach(adminResourceConfig::register);
 adminResourceConfig.register(ConnectExceptionMapper.class);
+
adminResourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);

Review Comment:
   Good call, done. I've also cleaned up the configuration of the 
`adminResourceConfig` to hopefully prevent other kinds of duplication-related 
issues in the future; LMKWYT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-09 Thread via GitHub


mumrah commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1446495993


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {

Review Comment:
   Can you move this code into a new class? KafkaApis is already much too large.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {
+metadataCache match {
+  case _: ZkMetadataCache =>
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request")
+  case _ =>
+}
+val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+val describeTopicPartitionsRequest = 
request.body[DescribeTopicPartitionsRequest].data()
+var topics = scala.collection.mutable.Set[String]()
+describeTopicPartitionsRequest.topics().forEach(topic => 
topics.add(topic.name()))
+
+val cursor = describeTopicPartitionsRequest.cursor()
+val fetchAllTopics = topics.isEmpty
+if (fetchAllTopics) {
+  metadataCache.getAllTopics().foreach(topic => topics.add(topic))

Review Comment:
   If we're paginating through all topics and have a cursor, we can avoid 
gather only the desired topics during this O(n) loop through all topics. That 
would let us avoid another O(n) operation below to filter the undesired topics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]

2024-01-09 Thread via GitHub


C0urante commented on code in PR #15149:
URL: https://github.com/apache/kafka/pull/15149#discussion_r1446490147


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java:
##
@@ -159,7 +163,8 @@ public class ConnectorsResourceTest {
 public void setUp() throws NoSuchMethodException {
 when(serverConfig.topicTrackingEnabled()).thenReturn(true);
 when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
-connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient);
+RestRequestTimeout requestTimeout = () -> 
RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;

Review Comment:
   Good catch, thanks 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1446488616


##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -27,21 +27,24 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   I included these comments btw:
   ```
*   1. partitions for which the broker is not a replica anymore
*   2. partitions for which the broker is now a leader (leader epoch 
bump on the leader)
*   3. partitions for which the isr or replicas change if the broker is 
a leader (partition epoch bump on the leader)
*   4. partitions for which the broker is now a follower or follower 
with isr or replica updates (partition epoch bump on follower)
   ``` 
   
   I really struggled with the name "leaders" when i was reading the code which 
is why i didn't want to leave it blank. But I will think on it again an update 
the comments to be even more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate #testStartPaused to Mockito [kafka]

2024-01-09 Thread via GitHub


gharris1727 commented on PR #14663:
URL: https://github.com/apache/kafka/pull/14663#issuecomment-1883616431

   Hi @hgeraldino Thanks for taking on the migration!
   
   I understand the idea behind your refactor-then-deduplicate strategy, but I 
think the excessive duplication is making it difficult (at least for me) to 
review the change.
   
   What do you think about starting a new test class, and moving the migrated 
tests into that new class? This would allow you to use synonymous variable 
names, annotation mocks, and method names? At the end we can delete the 
original class and move the new class back to the original class name.
   
   This will separate the added and removed parts in the diff, where they are 
currently inline. But the mocking libraries are so substantially different that 
the inline parts of the diff are not very helpful anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2024-01-09 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804846#comment-17804846
 ] 

Lianet Magrans commented on KAFKA-15475:


Heads up, the TopicMetadataManager and CommitRequestManager already solved 
this, in a similar way. Still needed to be verified/fixed in other requests if 
applicable.

> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced/failed

2024-01-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-15588:
--

Assignee: Lianet Magrans  (was: Philip Nee)

> Purge the unsent offset commits/fetches when the member is fenced/failed
> 
>
> Key: KAFKA-15588
> URL: https://issues.apache.org/jira/browse/KAFKA-15588
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When the member is fenced/failed, we should purge the inflight offset commits 
> and fetches.  HeartbeatRequestManager should be able to handle this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16099) Handle timeouts for AsyncKafkaConsumer.commitSync

2024-01-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-16099.

Fix Version/s: 3.7.0
   Resolution: Fixed

> Handle timeouts for AsyncKafkaConsumer.commitSync
> -
>
> Key: KAFKA-16099
> URL: https://issues.apache.org/jira/browse/KAFKA-16099
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Andrew Schofield
>Priority: Major
> Fix For: 3.7.0
>
>
> The handling of synchronous offset commits in the background thread does not 
> observe the caller's timeout. In the situation that a commit request needs to 
> be retried, the retries should not extend beyond the caller's timeout. The 
> CommitApplicationEvent should contain the timeout and not continue beyond 
> that time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15853: Move ProcessRole to server module [kafka]

2024-01-09 Thread via GitHub


OmniaGM opened a new pull request, #15166:
URL: https://github.com/apache/kafka/pull/15166

   prepare to move KafkaConfig
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException

2024-01-09 Thread Anatolii Popov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anatolii Popov updated KAFKA-16105:
---
Description: 
When partition reassignment is happening for a tiered topic in most of the 
cases it's stuck with RemoteStorageException's on follower nodes saying that it 
can not construct remote log auxilary state:

 
{code:java}
[2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, 
fetcherId=2] Error building remote log auxiliary state for test-24 
(kafka.server.ReplicaFetcherThread)
                                         
org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't 
build the state from remote store for partition: test-24, currentLeaderEpoch: 
8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the 
previous remote log segment metadata was not found
                                                 at 
kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259)
                                                 at 
kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106)
                                                 at 
kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762)
                                                 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413)
                                                 at 
scala.Option.foreach(Option.scala:437)
                                                 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
                                                 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
                                                 at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
                                                 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
                                                 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
                                                 at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
                                                 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
                                                 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
                                                 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
                                                 at 
scala.Option.foreach(Option.scala:437)
                                                 at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
                                                 at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
                                                 at 
kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
                                                 at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
 {code}
 

Scenario:

A cluster of 3 nodes with a single topic with 30 partitions. All partitions 
have tiered segments.
Adding 3 more nodes to the cluster and making a reassignment to move all the 
data to new nodes.
Behavior:
For most of the partitions reassignment is happening smoothly.
For some of the partitions when a new node starts to get assignments it reads 
__remote_log_metadata topic and tries to initialize the metadata cache on 
records with COPY_SEGMENT_STARTED. If it's reading such a message for the 
partition before the partition was assigned to this specific node it ignores 
the message, so skips the cache initialization and marks the partition as 
assigned. So reassignment is stuck since COPY_SEGMENT_STARTED is never properly 
processed.

Expected behavior:
The partitions should not be marked as assigned before the cache is initialized 
to be able to re-read COPY_SEGMENT_STARTED message and initialize the cache.

 

Some notes:
This is most probably happening when there are messages in a single metadata 
partition and the order of the messages does not correspond to the order of 
assignment. So the follower reads the COPY_SEGMENT_STARTED message, sees that 
the user partition is not yet assigned to this node, skips the message, and 
marks the user partition as assigned. On the next iteration, it resets to 
beginning ONLY 

[jira] [Created] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException

2024-01-09 Thread Anatolii Popov (Jira)
Anatolii Popov created KAFKA-16105:
--

 Summary: Reassignment of tiered topics is failing due to 
RemoteStorageException
 Key: KAFKA-16105
 URL: https://issues.apache.org/jira/browse/KAFKA-16105
 Project: Kafka
  Issue Type: Bug
  Components: Tiered-Storage
Reporter: Anatolii Popov


When partition reassignment is happening for a tiered topic in most of the 
cases it's stuck with RemoteStorageException's on follower nodes saying that it 
can not construct remote log auxilary state:

 
{code:java}
[2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, 
fetcherId=2] Error building remote log auxiliary state for test-24 
(kafka.server.ReplicaFetcherThread)
                                         
org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't 
build the state from remote store for partition: test-24, currentLeaderEpoch: 
8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the 
previous remote log segment metadata was not found
                                                 at 
kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259)
                                                 at 
kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106)
                                                 at 
kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762)
                                                 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413)
                                                 at 
scala.Option.foreach(Option.scala:437)
                                                 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
                                                 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
                                                 at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
                                                 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
                                                 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
                                                 at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
                                                 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
                                                 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
                                                 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
                                                 at 
scala.Option.foreach(Option.scala:437)
                                                 at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
                                                 at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
                                                 at 
kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
                                                 at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
 {code}
 

Scenario:

A cluster of 3 nodes with a single topic with 30 partitions. All partitions 
have tiered segments.
Adding 3 more nodes to the cluster and making a reassignment to move all the 
data to new nodes.
Behavior:
For most of the partitions reassignment is happening smoothly.
For some of the partitions when a new node starts to get assignments it reads 
__remote_log_metadata topic and tries to initialize the metadata cache on 
records with COPY_SEGMENT_STARTED. If it's reading such a message for the 
partition before the partition was assigned to this specific node it ignores 
the message, so skips the cache initialization and marks the partition as 
assigned. So reassignment is stuck since COPY_SEGMENT_STARTED is never properly 
processed.

Expected behavior:
The partitions should not be marked as assigned before the cache is initialized 
to be able to re-read COPY_SEGMENT_STARTED message and initialize the cache.

 

Some notes:
This is most probably happening when there are messages in a single metadata 
partition and the order of the messages does not correspond to the order of 
assignment. So the follower reads the COPY_SEGMENT_STARTED m

[PR] Reassignment fix [kafka]

2024-01-09 Thread via GitHub


AnatolyPopov opened a new pull request, #15165:
URL: https://github.com/apache/kafka/pull/15165

   When partition reassignment is happening for a tiered topic in most of the 
cases it's stuck with RemoteStorageException's on follower nodes saying that it 
can not construct remote log auxilary state:
   
   ```
   [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, 
fetcherId=2] Error building remote log auxiliary state for test-24 
(kafka.server.ReplicaFetcherThread)
                                            
org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't 
build the state from remote store for partition: test-24, currentLeaderEpoch: 
8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the 
previous remote log segment metadata was not found
                                                    at 
kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259)
                                                    at 
kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106)
                                                    at 
kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762)
                                                    at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413)
                                                    at 
scala.Option.foreach(Option.scala:437)
                                                    at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
                                                    at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
                                                    at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
                                                    at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
                                                    at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
                                                    at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
                                                    at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
                                                    at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
                                                    at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
                                                    at 
scala.Option.foreach(Option.scala:437)
                                                    at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
                                                    at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
                                                    at 
kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
                                                    at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
   ```

   
   
   
   
   Scenario:
   
   A cluster of 3 nodes with a single topic with 30 partitions. All partitions 
have tiered segments.
   Adding 3 more nodes to the cluster and making a reassignment to move all the 
data to new nodes.
   Behavior:
   For most of the partitions reassignment is happening smoothly.
   For some of the partitions when a new node starts to get assignments it 
reads __remote_log_metadata topic and tries to initialize the metadata cache on 
records with COPY_SEGMENT_STARTED. If it's reading such a message for the 
partition before the partition was assigned to this specific node it ignores 
the message, so skips the cache initialization and marks the partition as 
assigned. So reassignment is stuck since COPY_SEGMENT_STARTED is never properly 
processed.
   
   Expected behavior:
   The partitions should not be marked as assigned the cache is initialized to 
be able to re-read COPY_SEGMENT_STARTED message and initialize the cache.
   
   
   
   
   Some notes:
   This is most probably happening when there are messages in a single metadata 
partition and the order of the messages does not correspond to the order of 
assignment. So the follower reads the COPY_SEGMENT_STARTED message, sees that 
the user partition is not yet assigned to this node, skips the message, and 
marks the user partition as assigned. On the next iteration, it resets to 
beginning ONLY t

[PR] KAFKA-15853: Move PasswordEncoder to server module [kafka]

2024-01-09 Thread via GitHub


OmniaGM opened a new pull request, #15164:
URL: https://github.com/apache/kafka/pull/15164

   blocked on #15158 - Tests will fail 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15721: KRaft support in DeleteTopicsRequestWithDeletionDisabledTest [kafka]

2024-01-09 Thread via GitHub


jolshan commented on PR #15124:
URL: https://github.com/apache/kafka/pull/15124#issuecomment-1883534800

   Sorry I don't seem to get notified for tags until the PR gets merged. I need 
to look at my notification settings 😅 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol

2024-01-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15284:
--
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Implement ConsumerGroupProtocolVersionResolver to determine consumer group 
> protocol
> ---
>
> Key: KAFKA-15284
> URL: https://issues.apache.org/jira/browse/KAFKA-15284
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> At client initialization, we need to determine which of the 
> {{ConsumerDelegate}} implementations to use:
>  # {{LegacyKafkaConsumerDelegate}}
>  # {{AsyncKafkaConsumerDelegate}}
> There are conditions defined by KIP-848 that determine client eligibility to 
> use the new protocol. This will be modeled by the—deep 
> breath—{{{}ConsumerGroupProtocolVersionResolver{}}}.
> Known tasks:
>  * Determine at what point in the {{Consumer}} initialization the network 
> communication should happen
>  * Determine what RPCs to invoke in order to determine eligibility (API 
> versions, IBP version, etc.)
>  * Implement the network client lifecycle (startup, communication, shutdown, 
> etc.)
>  * Determine the fallback path in case the client is not eligible to use the 
> protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16097: Add suspended tasks back to the state updater when reassigned [kafka]

2024-01-09 Thread via GitHub


lucasbru opened a new pull request, #15163:
URL: https://github.com/apache/kafka/pull/15163

   When a partition is revoked, the corresponding task gets a pending action
   "SUSPEND". This pending action may overwrite a previous pending action.
   
   If the task was previously removed from the state updater, e.g. because
   we were fenced, the pending action is overwritten with suspend, and in
   handleAssigned, upon reassignment of that task, then SUSPEND action is
   removed.
   
   Then, once the state updater executes the removal, no pending action
   is registered anymore, and we run into an IllegalStateException.
   
   This commit solves the problem by adding back reassigned tasks to the
   state updater, since they may have been removed from the state updater
   for another reason than being restored completely.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-09 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-14404:

External issue URL:   (was: https://github.com/apache/kafka/pull/15162)

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ayoub Omari
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-09 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-14404:

External issue URL: https://github.com/apache/kafka/pull/15162

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ayoub Omari
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >