[jira] [Commented] (KAFKA-15610) Fix `CoreUtils.swallow()` test gaps

2023-10-18 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1044#comment-1044
 ] 

Ismael Juma commented on KAFKA-15610:
-

Yes.

> Fix `CoreUtils.swallow()` test gaps
> ---
>
> Key: KAFKA-15610
> URL: https://issues.apache.org/jira/browse/KAFKA-15610
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Priority: Minor
>  Labels: newbie
>
> For example, it should verify that the passed in `logging` is used in case of 
> an exception. We found that there is no test for this in 
> https://github.com/apache/kafka/pull/14529#discussion_r1355277747.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15610) Fix `CoreUtils.swallow()` test gaps

2023-10-18 Thread Atul Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1038#comment-1038
 ] 

Atul Sharma commented on KAFKA-15610:
-

Hi [~ijuma], can i take this?


> Fix `CoreUtils.swallow()` test gaps
> ---
>
> Key: KAFKA-15610
> URL: https://issues.apache.org/jira/browse/KAFKA-15610
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Priority: Minor
>  Labels: newbie
>
> For example, it should verify that the passed in `logging` is used in case of 
> an exception. We found that there is no test for this in 
> https://github.com/apache/kafka/pull/14529#discussion_r1355277747.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15643) Improve unloading logging

2023-10-18 Thread David Jacot (Jira)
David Jacot created KAFKA-15643:
---

 Summary: Improve unloading logging
 Key: KAFKA-15643
 URL: https://issues.apache.org/jira/browse/KAFKA-15643
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot


When a new leader is elected for a __consumer_offset partition, the followers 
are notified to unload the state. However, only the former leader is aware of 
it. The remaining follower prints out the following error:
ERROR [GroupCoordinator id=1] Execution of 
UnloadCoordinator(tp=__consumer_offsets-1, epoch=0) failed due to This is not 
the correct coordinator.. 
(org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
The error is actually correct but we should improve the logging to not print 
anything when in the remaining follower case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-18 Thread via GitHub


mjsax commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1364913876


##
clients/src/main/java/org/apache/kafka/common/telemetry/collector/MetricsCollector.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.collector;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.telemetry.emitter.Emitter;
+
+/**
+ * A {@code MetricsCollector} is responsible for scraping a source of metrics 
and forwarding
+ * them to the given {@link Emitter}. For example, a given collector might be 
used to collect
+ * system metrics, Kafka metrics, JVM metrics, or other metrics that are to be 
captured, exposed,
+ * and/or forwarded.
+ *
+ * 

Review Comment:
   Should just be `` (similar below)



##
clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryPayload.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.telemetry;

Review Comment:
   What just comes to my mind: the KIP should specify where new public 
interfaces are added. It's missing. Can we add it?



##
clients/src/main/java/org/apache/kafka/common/telemetry/collector/MetricsCollector.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.collector;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.telemetry.emitter.Emitter;
+
+/**
+ * A {@code MetricsCollector} is responsible for scraping a source of metrics 
and forwarding
+ * them to the given {@link Emitter}. For example, a given collector might be 
used to collect
+ * system metrics, Kafka metrics, JVM metrics, or other metrics that are to be 
captured, exposed,
+ * and/or forwarded.
+ *
+ * 
+ *
+ * In general, a {@code MetricsCollector} implementation is closely managed by 
another entity
+ * (that entity is colloquially referred to as the "telemetry reporter") that 
will be in
+ * charge of its lifecycle via the {@link #start()} and {@link #stop()} 
methods. The telemetry
+ * reporter should ensure that the {@link #start()} method is invoked once 
and only once
+ * before calls to {@link #collect(Emitter)} are made. Implementations of 
{@code MetricsCollector}
+ * should allow for the corner-case that {@link #stop()} is called before 
{@link #start()},
+ * which might happen in the case of error on startup of the telemetry 
reporter.
+ *
+ * 
+ *
+ * Regarding threading, the {@link #start()} and {@link #stop()} methods may 
be called from
+ * different threads and so proper 

[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

2023-10-18 Thread Hanyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanyu Zheng updated KAFKA-15629:

Summary: proposal to introduce IQv2 Query Types: TimestampedKeyQuery and 
TimestampedRangeQuery  (was: proposal to introduce IQv2 Query Types: 
TimeStampedKeyQuery and TimeStampedRangeQuery)

> proposal to introduce IQv2 Query Types: TimestampedKeyQuery and 
> TimestampedRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> KIP-992: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery]
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimeStampedKeyQuery and TimeStampedRangeQuery

2023-10-18 Thread Hanyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanyu Zheng updated KAFKA-15629:

Summary: proposal to introduce IQv2 Query Types: TimeStampedKeyQuery and 
TimeStampedRangeQuery  (was: proposal to introduce IQv2 Query Types: 
TimeStampKeyQuery and TimeStampRangeQuery)

> proposal to introduce IQv2 Query Types: TimeStampedKeyQuery and 
> TimeStampedRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> KIP-992: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery]
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15578: Migrating other system tests to use the group coordinator [kafka]

2023-10-18 Thread via GitHub


rreddy-22 opened a new pull request, #14582:
URL: https://github.com/apache/kafka/pull/14582

   Adding the use new coordinator flag to all the decorators in all the 
existing tests that use consumer groups.
   The new group coordinator cannot be used with zookeeper so this combination 
will not run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-18 Thread via GitHub


mjsax commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1364873194


##
clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetrySender.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.telemetry;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+
+import java.util.Optional;
+
+/**
+ * A {@link MetricsReporter} may implement this interface to indicate support 
for sending client
+ * telemetry to the broker.
+ */
+@InterfaceStability.Evolving
+public interface ClientTelemetrySender extends AutoCloseable {

Review Comment:
   We can still add it to the KIP. (And must, as it's a public interface, 
right?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-18 Thread via GitHub


mjsax commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1364874333


##
clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryReceiver.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.telemetry;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+
+@InterfaceStability.Evolving
+public interface ClientTelemetryReceiver {

Review Comment:
   If it's for broker only, it should no be added to `clients` module, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-18 Thread via GitHub


mjsax commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1364873194


##
clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetrySender.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.telemetry;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+
+import java.util.Optional;
+
+/**
+ * A {@link MetricsReporter} may implement this interface to indicate support 
for sending client
+ * telemetry to the broker.
+ */
+@InterfaceStability.Evolving
+public interface ClientTelemetrySender extends AutoCloseable {

Review Comment:
   We can still add it to the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Identify clean shutdown broker [kafka]

2023-10-18 Thread via GitHub


CalvinConfluent commented on PR #14465:
URL: https://github.com/apache/kafka/pull/14465#issuecomment-1769903140

   @junrao Thanks for the review, no relevant tests failed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Identify clean shutdown broker [kafka]

2023-10-18 Thread via GitHub


CalvinConfluent commented on code in PR #14465:
URL: https://github.com/apache/kafka/pull/14465#discussion_r1364872040


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -327,6 +328,10 @@ public ControllerResult 
registerBroker(
 }
 int brokerId = request.brokerId();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
+if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {

Review Comment:
   Sounds good. If the version is < 2, we can skip handling it because ELR 
can't be enabled. We can adjust it later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15616) Define client telemetry states and their transitions

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15616:

Fix Version/s: 3.7.0

> Define client telemetry states and their transitions
> 
>
> Key: KAFKA-15616
> URL: https://issues.apache.org/jira/browse/KAFKA-15616
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> The client emitting metrics to broker needs to maintain states which 
> specifies what next action client should take i.e. request subscriptions, 
> push telemetry, etc.
>  
> The changes should include comprehensive definition of all states a client 
> can move into and their transitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax merged PR #14566:
URL: https://github.com/apache/kafka/pull/14566


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax commented on PR #14566:
URL: https://github.com/apache/kafka/pull/14566#issuecomment-1769879629

   Again a lot of flaky test noise:
   ```
   Build / JDK 11 and Scala 2.13 / 
testFollowerCompleteDelayedFetchesOnReplication(String).quorum=kraft – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   7s
   Build / JDK 11 and Scala 2.13 / 
testDeleteCmdNonEmptyGroup(String).quorum=kraft – 
kafka.admin.DeleteConsumerGroupsTest
   10s
   Build / JDK 11 and Scala 2.13 / 
shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2] – 
org.apache.kafka.streams.integration.EosIntegrationTest
   1m 14s
   Build / JDK 11 and Scala 2.13 / shouldUpgradeFromEosAlphaToEosV2[true] – 
org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest
   4m 21s
   Build / JDK 11 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Build / JDK 21 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk 
– integration.kafka.server.FetchFromFollowerIntegrationTest
   38s
   Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   2m 15s
   Build / JDK 8 and Scala 2.12 / 
testRackAwareRangeAssignor(String).quorum=kraft – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   16s
   Build / JDK 8 and Scala 2.12 / testRackAwareRangeAssignor(String).quorum=zk 
– integration.kafka.server.FetchFromFollowerIntegrationTest
   41s
   Build / JDK 8 and Scala 2.12 / 
shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once] – 
org.apache.kafka.streams.integration.EosIntegrationTest
   1m 18s
   Build / JDK 8 and Scala 2.12 / [2] Type=Raft-Isolated, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.7-IV0, 
Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest
   1m 7s
   Build / JDK 17 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk 
– integration.kafka.server.FetchFromFollowerIntegrationTest
   46s
   Build / JDK 17 and Scala 2.13 / 
testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft
 – kafka.server.DescribeClusterRequestTest
   4s
   Build / JDK 17 and Scala 2.13 / testFenceMultipleBrokers() – 
org.apache.kafka.controller.QuorumControllerTest
   1m 48s
   Build / JDK 17 and Scala 2.13 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   15s
   Build / JDK 17 and Scala 2.13 / 
shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] – 
org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest
   34s
   Build / JDK 17 and Scala 2.13 / shouldUpgradeFromEosAlphaToEosV2[true] – 
org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest
   4m 39s
   Build / JDK 17 and Scala 2.13 / [2] Type=Raft-Isolated, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.7-IV0, 
Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest
   1m 8s
   Build / JDK 17 and Scala 2.13 / [5] Type=Raft-Combined, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.7-IV0, 
Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest
   1m 7s
   Build / JDK 17 and Scala 2.13 / [2] Type=Raft-Isolated, 
Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.7-IV0, 
Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest
   1m 8s
   Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax commented on code in PR #14566:
URL: https://github.com/apache/kafka/pull/14566#discussion_r1364865721


##
clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.telemetry;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * State that helps determine where client exists in the telemetry state i.e. 
subscribe->wait->push loop.
+ */
+public enum ClientTelemetryState {
+
+/**
+ * Client needs subscription from the broker.
+ */
+SUBSCRIPTION_NEEDED,
+
+/**
+ * Network I/O is in progress to retrieve subscription.
+ */
+SUBSCRIPTION_IN_PROGRESS,
+
+/**
+ * Awaiting telemetry interval for pushing metrics to broker.
+ */
+PUSH_NEEDED,

Review Comment:
   I agree with Apoorv. It's just a question how you read the state, and 
`PUSH_NEEDED` is the permanent state in steady state, it does not mean 
"PUSH_NEEDED_NOW".
   
   Not sure if we could find a better name? It's an internal state machine so 
we can rename it at any point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on PR #14532:
URL: https://github.com/apache/kafka/pull/14532#issuecomment-1769876947

   Hi @cadonna Thank you for spending time reviewing the PR again. I have 
answered your comments, and attached a followup ticket and a minor PR: 
https://github.com/apache/kafka/pull/14581
   
   The PR is based on this PR so the commit history looks a bit messy; once we 
merge this PR, I will rebase again.
   
   Let me know if you have more questions around this PR, love to address them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Simplify UnsentRequest constructor [kafka]

2023-10-18 Thread via GitHub


philipnee opened a new pull request, #14581:
URL: https://github.com/apache/kafka/pull/14581

   Per: https://github.com/apache/kafka/pull/14532
   
   The constructor of providing a BiConsumer seems confusing.  Instead, here I 
provide a whenComplete() interface to chain the user provided callback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15626: Replace verification guard object with an specific type [kafka]

2023-10-18 Thread via GitHub


ijuma commented on code in PR #14568:
URL: https://github.com/apache/kafka/pull/14568#discussion_r1364857293


##
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class VerificationGuard {
+
+// The sentinel VerificationGuard will be used as a default when no 
verification guard is provided.
+// It can not be used to verify a transaction is ongoing and its value is 
always 0.
+public static final VerificationGuard SENTINEL = new VerificationGuard(0);
+private static final AtomicLong INCREMENTING_ID = new AtomicLong(0L);
+private final long value;
+
+public VerificationGuard() {
+value = INCREMENTING_ID.incrementAndGet();
+}
+
+private VerificationGuard(long value) {
+this.value = value;
+}
+
+@Override
+public String toString() {
+return "VerificationGuard: " + value;
+}
+
+@Override
+public boolean equals(Object obj) {
+if ((null == obj) || (obj.getClass() != this.getClass()))
+return false;
+VerificationGuard guard = (VerificationGuard) obj;
+return value == guard.value();
+}
+
+@Override
+public int hashCode() {
+return Long.hashCode(value);
+}
+
+private long value() {
+return value;
+}
+
+public boolean verifiedBy(VerificationGuard verifyingGuard) {

Review Comment:
   Nit: would `verify` be a better method name? It's in the active voice which 
is generally better for methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-18 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1004#comment-1004
 ] 

Henry Cai commented on KAFKA-15620:
---

Yes, non-blocking warning in the log.  But it also prints out the stack trace 
in the log.

It will give people uneasy feelings if they don't understand the root cause of 
the error.

This warning/stack-trace will show up every time a broker restarts, and for 
every broker in the cluster.  And this kind of warning messages will occur more 
and for more segments when the cluster runs longer and longer.

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> 

Re: [PR] KAFKA-15626: Replace verification guard object with an specific type [kafka]

2023-10-18 Thread via GitHub


ijuma commented on code in PR #14568:
URL: https://github.com/apache/kafka/pull/14568#discussion_r1364857293


##
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class VerificationGuard {
+
+// The sentinel VerificationGuard will be used as a default when no 
verification guard is provided.
+// It can not be used to verify a transaction is ongoing and its value is 
always 0.
+public static final VerificationGuard SENTINEL = new VerificationGuard(0);
+private static final AtomicLong INCREMENTING_ID = new AtomicLong(0L);
+private final long value;
+
+public VerificationGuard() {
+value = INCREMENTING_ID.incrementAndGet();
+}
+
+private VerificationGuard(long value) {
+this.value = value;
+}
+
+@Override
+public String toString() {
+return "VerificationGuard: " + value;
+}
+
+@Override
+public boolean equals(Object obj) {
+if ((null == obj) || (obj.getClass() != this.getClass()))
+return false;
+VerificationGuard guard = (VerificationGuard) obj;
+return value == guard.value();
+}
+
+@Override
+public int hashCode() {
+return Long.hashCode(value);
+}
+
+private long value() {
+return value;
+}
+
+public boolean verifiedBy(VerificationGuard verifyingGuard) {

Review Comment:
   Nit: would `verify` be a better method name? It's the active voice which is 
generally better for methods.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class VerificationGuard {

Review Comment:
   Nit: should this be final?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class VerificationGuard {
+
+// The sentinel VerificationGuard will be used as a default when no 
verification guard is provided.
+// It can not be used to verify a transaction is ongoing and its value is 
always 0.
+public static final VerificationGuard SENTINEL = new VerificationGuard(0);
+private static final AtomicLong INCREMENTING_ID = new 

Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax commented on code in PR #14566:
URL: https://github.com/apache/kafka/pull/14566#discussion_r1364857199


##
clients/src/main/java/org/apache/kafka/common/errors/IllegalClientTelemetryStateException.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+public class IllegalClientTelemetryStateException extends KafkaException {

Review Comment:
   Thanks! (What Philip said: we could add a new exception, but if user are 
supposed to handle them, it must be part of the KIP -- we could of course still 
add it to the KIP) -- but avoiding it and using `IllegalStateException` seems 
better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax commented on code in PR #14566:
URL: https://github.com/apache/kafka/pull/14566#discussion_r1364856191


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryState.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.errors.IllegalClientTelemetryStateException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * State that helps determine where client exists in the telemetry state i.e. 
subscribe->wait->push loop.
+ */
+public enum ClientTelemetryState {
+
+/**
+ * Client needs subscription from the broker.
+ */
+SUBSCRIPTION_NEEDED,
+
+/**
+ * Network I/O is in progress to retrieve subscription.
+ */
+SUBSCRIPTION_IN_PROGRESS,
+
+/**
+ * Awaiting telemetry interval for pushing metrics to broker.
+ */
+PUSH_NEEDED,
+
+/**
+ * Network I/O in progress for pushing metrics payload.
+ */
+PUSH_IN_PROGRESS,
+
+/**
+ * Need to push the terminal metrics payload.
+ */
+TERMINATING_PUSH_NEEDED,
+
+/**
+ * Network I/O in progress for pushing terminal metrics payload.
+ */
+TERMINATING_PUSH_IN_PROGRESS,
+
+/**
+ * No more work should be performed, telemetry client terminated.
+ */
+TERMINATED;
+
+private final static Map> 
VALID_NEXT_STATES = new EnumMap<>(ClientTelemetryState.class);
+
+static {
+/*
+ If clients needs a subscription, then issue telemetry API to fetch 
subscription from broker.
+
+ However, it's still possible that client doesn't get very far before 
terminating.
+*/
+VALID_NEXT_STATES.put(
+SUBSCRIPTION_NEEDED, Arrays.asList(SUBSCRIPTION_IN_PROGRESS, 
TERMINATED));
+
+/*
+ If client is finished waiting for subscription, then client is ready 
to push the telemetry.
+ But, it's possible that no telemetry metrics are requested, hence 
client should go back to
+ subscription needed state i.e. requesting the next updated 
subscription.
+
+ However, it's still possible that client doesn't get very far before 
terminating.
+*/
+VALID_NEXT_STATES.put(SUBSCRIPTION_IN_PROGRESS, 
Arrays.asList(PUSH_NEEDED,
+SUBSCRIPTION_NEEDED, TERMINATING_PUSH_NEEDED, TERMINATED));

Review Comment:
   Thanks for explaining. My thought was that we would still transit to 
`TERMINATING_PUSH_NEEDED`, but potentially "skip" this state, and do an 
immediate second transition to `TERMINATED` -- but if't easier to transit to 
`TERMINATED` directly, also fine with me. (Just want to understand what's going 
on.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]

2023-10-18 Thread via GitHub


iit2009060 commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1364855206


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -672,16 +673,84 @@ class RemoteIndexCacheTest {
 verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+// Create a spy Cache Entry
+val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+  time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+
+val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+assertCacheSize(1)
+
+var entry: RemoteIndexCache.Entry = null
+
+val latchForCacheRead = new CountDownLatch(1)
+val latchForCacheRemove = new CountDownLatch(1)
+val latchForTestWait = new CountDownLatch(1)
+
+var markForCleanupCallCount = 0
+
+doAnswer((invocation: InvocationOnMock) => {
+  markForCleanupCallCount += 1
+
+  if (markForCleanupCallCount == 1) {
+// Signal the CacheRead to unblock itself
+latchForCacheRead.countDown()
+// Wait for signal to start renaming the files
+latchForCacheRemove.await()
+// Calling the markForCleanup() actual method to start renaming the 
files
+invocation.callRealMethod()

Review Comment:
   @jeel2420  There are two times when markForCleanUp is called. 
   1. Throw `remove` function which we are calling in `removeCache Runnable`.
   2. One at `invocation.callRealMethod() 708 line no`
   
   Even i tried running your test case locally it always assert with cacheSize 
0 , as it is eventually getting deleted. 
   IMO we should read and remove concurrently in the separate thread and 
validate the cacheSize based on the order of execution. We should not need to 
call explicitly for the scenario. 
   @showuon  WDYT ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]

2023-10-18 Thread via GitHub


iit2009060 commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1364855206


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -672,16 +673,84 @@ class RemoteIndexCacheTest {
 verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+// Create a spy Cache Entry
+val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+  time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+
+val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+assertCacheSize(1)
+
+var entry: RemoteIndexCache.Entry = null
+
+val latchForCacheRead = new CountDownLatch(1)
+val latchForCacheRemove = new CountDownLatch(1)
+val latchForTestWait = new CountDownLatch(1)
+
+var markForCleanupCallCount = 0
+
+doAnswer((invocation: InvocationOnMock) => {
+  markForCleanupCallCount += 1
+
+  if (markForCleanupCallCount == 1) {
+// Signal the CacheRead to unblock itself
+latchForCacheRead.countDown()
+// Wait for signal to start renaming the files
+latchForCacheRemove.await()
+// Calling the markForCleanup() actual method to start renaming the 
files
+invocation.callRealMethod()

Review Comment:
   @jeel2420  There are two times when markForCleanUp is called. 
   1. `remove` function which we are calling in `removeCache Runnable`.
   2. One at `invocation.callRealMethod() 708 line no`
   
   Even i tried running your test case locally it always assert with cacheSize 
0 , as it is eventually getting deleted. 
   IMO we should read and remove concurrently in the separate thread and 
validate the cacheSize based on the order of execution. We should not need to 
call explicitly for the scenario. 
   @showuon  WDYT ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15616: Client telemetry states and transition (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax commented on code in PR #14566:
URL: https://github.com/apache/kafka/pull/14566#discussion_r1364855062


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryState.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.errors.IllegalClientTelemetryStateException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * State that helps determine where client exists in the telemetry state i.e. 
subscribe->wait->push loop.
+ */
+public enum ClientTelemetryState {

Review Comment:
   Sounds good. Thx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-18 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776999#comment-17776999
 ] 

Kamal Chandraprakash commented on KAFKA-15620:
--

[~h...@pinterest.com]

What was the impact of this bug? Going by the code, it will log one warning 
statement and proceed further. 

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
>  
> You can see there are 2 

[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-18 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776996#comment-17776996
 ] 

Henry Cai commented on KAFKA-15620:
---

Thanks.  Will give it a try.

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
>  
> You can see there are 2 leader epochs in this segment: 
> *segmentLeaderEpochs=\{5=6387830, 6=6721329}*
>  
> 5. From the remote log retention code 
> 

Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364845881


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -188,9 +188,7 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 if (response != null) {
 onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), response.receivedTimeMs());

Review Comment:
   Thanks
   `request.handler().completionTimeMs() instead of response.receivedTimeMs()` 
- This is addressed
   
   `UnsentRequest using request.handler() would not be possible` - Let me 
quickly follow up with a MINOR PR to address this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]

2023-10-18 Thread via GitHub


showuon commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1364844466


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -672,16 +673,89 @@ class RemoteIndexCacheTest {
 verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+// Create a spy Cache Entry
+val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+  time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+
+val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+assertCacheSize(1)
+
+var entry: RemoteIndexCache.Entry = null
+
+val latchForCacheRead = new CountDownLatch(1)
+val latchForCacheRemove = new CountDownLatch(1)
+val latchForTestWait = new CountDownLatch(1)
+
+var markForCleanupCallCount = 0
+
+doAnswer((invocation: InvocationOnMock) => {
+  markForCleanupCallCount += 1
+
+  if (markForCleanupCallCount == 1) {
+// Signal the CacheRead to unblock itself
+latchForCacheRead.countDown()
+// Wait for signal to start renaming the files
+latchForCacheRemove.await()
+// Calling the markForCleanup() actual method to start renaming the 
files
+invocation.callRealMethod()
+// Signal TestWait to unblock itself so that test can be completed
+latchForTestWait.countDown()
+  }
+}).when(spyEntry).markForCleanup()
+
+val removeCache = (() => {
+  cache.remove(rlsMetadata.remoteLogSegmentId().id())
+}): Runnable
+
+val readCache = (() => {
+  // Wait for signal to start CacheRead
+  latchForCacheRead.await()
+  entry = cache.getIndexEntry(rlsMetadata)
+  // Signal the CacheRemove to start renaming the files
+  latchForCacheRemove.countDown()
+}): Runnable
+
+val executor = Executors.newFixedThreadPool(2)
+try {
+  executor.submit(removeCache: Runnable)
+  executor.submit(readCache: Runnable)

Review Comment:
   @jeel2420 , sorry, I had another look and found we should also verify these 
2 threads has no exception thrown. In the issue description, without this fix, 
there will be IOException thrown. So, we should verify there's no exception 
using the returned `future` from `executor.submit`. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15613: Client API definition and configurations (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax merged PR #14560:
URL: https://github.com/apache/kafka/pull/14560


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364842391


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java:
##
@@ -151,6 +152,36 @@ void testHardFailures(Exception exception) {
 }
 }
 
+@Test
+public void testNetworkTimeout() {

Review Comment:
   Not entirely sure about the behavior of the OffsetRequestManager - I'll need 
to get back to this.  Added a ticket there: KAFKA-15642



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15642) Ensure offset fetcher behaves correctly when the request is Timeout

2023-10-18 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15642:
--

 Summary: Ensure offset fetcher behaves correctly when the request 
is Timeout
 Key: KAFKA-15642
 URL: https://issues.apache.org/jira/browse/KAFKA-15642
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee
Assignee: Philip Nee


I need to test the behavior of OffsetFetcher when the request is timeout - It 
seems like we should continue to retry until the top level timer times out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15613: Client API definition and configurations (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax commented on PR #14560:
URL: https://github.com/apache/kafka/pull/14560#issuecomment-1769844494

   The Jenkins UI is a mess -- you can click on "Tests" in the top menu bar to 
see failing tests.
   
   ```
   New failing - 16
   Build / JDK 8 and Scala 2.12 / testTimeouts() – 
org.apache.kafka.controller.QuorumControllerTest
   <1s
   Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Build / JDK 21 and Scala 2.13 / [2] tlsProtocol=TLSv1.2, useInlinePem=true – 
org.apache.kafka.common.network.SslTransportLayerTest
   19s
   Build / JDK 21 and Scala 2.13 / shouldHaveSamePositionBoundActiveAndStandBy 
– org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   14s
   Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Build / JDK 11 and Scala 2.13 / testSingleIP() – 
org.apache.kafka.clients.ClusterConnectionStatesTest
   <1s
   Build / JDK 11 and Scala 2.13 / testSingleIP() – 
org.apache.kafka.clients.ClusterConnectionStatesTest
   <1s
   Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   3m 28s
   Build / JDK 11 and Scala 2.13 / testRestartReplication() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   1m 4s
   Build / JDK 11 and Scala 2.13 / 
testNoConsumeWithoutDescribeAclViaAssign(String).quorum=kraft – 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest
   13s
   Build / JDK 11 and Scala 2.13 / [1] error=UNKNOWN_SERVER_ERROR – 
kafka.coordinator.transaction.ProducerIdManagerTest
   15s
   Build / JDK 11 and Scala 2.13 / executeTieredStorageTest(String).quorum=zk – 
org.apache.kafka.tiered.storage.integration.EnableRemoteLogOnTopicTest
   39s
   Build / JDK 11 and Scala 2.13 / 
shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
 – org.apache.kafka.streams.integration.NamedTopologyIntegrationTest
   1m 11s
   Build / JDK 11 and Scala 2.13 / 
shouldInvokeUserDefinedGlobalStateRestoreListener() – 
org.apache.kafka.streams.integration.RestoreIntegrationTest
   1m 17s
   Build / JDK 11 and Scala 2.13 / 
shouldInvokeUserDefinedGlobalStateRestoreListener() – 
org.apache.kafka.streams.integration.RestoreIntegrationTest
   1m 12s
   Build / JDK 11 and Scala 2.13 / 
shouldHonorEOSWhenUsingCachingAndStandbyReplicas – 
org.apache.kafka.streams.integration.StandbyTaskEOSMultiRebalanceIntegrationTest
   2m 24s
   Existing failures - 3
   Build / JDK 17 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk 
– integration.kafka.server.FetchFromFollowerIntegrationTest
   37s
   Build / JDK 21 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk 
– integration.kafka.server.FetchFromFollowerIntegrationTest
   40s
   Build / JDK 21 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk 
– integration.kafka.server.FetchFromFollowerIntegrationTest
   ```
   
   But this is all just flaky tests (your PR should be be even able to break 
anything).
   
   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15604: Telemetry API request and response schemas and classes (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax commented on PR #14554:
URL: https://github.com/apache/kafka/pull/14554#issuecomment-1769842128

   Triggered a Jenkins re-run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15604: Telemetry API request and response schemas and classes (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax commented on code in PR #14554:
URL: https://github.com/apache/kafka/pull/14554#discussion_r1364838008


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -6222,4 +6222,62 @@ class KafkaApisTest {
 
 assertEquals(expectedResponse, response.data)
   }
+
+  @Test
+  def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = {
+val data = new GetTelemetrySubscriptionsRequestData()
+data.setClientInstanceId(Uuid.randomUuid())
+
+val request = buildRequest(new 
GetTelemetrySubscriptionsRequest.Builder(data, true).build())
+createKafkaApis(enableForwarding = true).handle(request, 
RequestLocal.NoCaching)
+
+val response = 
verifyNoThrottling[GetTelemetrySubscriptionsResponse](request)
+assertEquals(Errors.UNKNOWN_SERVER_ERROR, 
Errors.forCode(response.data.errorCode))

Review Comment:
   If this case cannot happen in practice, what do we actually test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15527: Add reverseRange and reverseAll query over kv-store in IQv2 [kafka]

2023-10-18 Thread via GitHub


hanyuzheng7 commented on code in PR #14477:
URL: https://github.com/apache/kafka/pull/14477#discussion_r1364836021


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1586,12 +1593,15 @@ public  void shouldHandleKeyQuery(
 public  void shouldHandleRangeQuery(
 final Optional lower,
 final Optional upper,
+final boolean isKeyAscending,
 final Function valueExtactor,
-final Set expectedValue) {
+final List expectedValue) {

Review Comment:
   It seems that exiting code using expectedValue and actualValue for other 
methods, I will update them all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate StateDirectory mock in TaskManagerTest to Mockito [kafka]

2023-10-18 Thread via GitHub


github-actions[bot] commented on PR #13897:
URL: https://github.com/apache/kafka/pull/13897#issuecomment-1769836677

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15604: Telemetry API request and response schemas and classes (KIP-714) [kafka]

2023-10-18 Thread via GitHub


mjsax commented on code in PR #14554:
URL: https://github.com/apache/kafka/pull/14554#discussion_r1364836200


##
clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequest.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class GetTelemetrySubscriptionsRequest extends AbstractRequest {
+
+public static class Builder extends 
AbstractRequest.Builder {
+
+private final GetTelemetrySubscriptionsRequestData data;
+
+public Builder(GetTelemetrySubscriptionsRequestData data) {
+this(data, false);
+}
+
+public Builder(GetTelemetrySubscriptionsRequestData data, boolean 
enableUnstableLastVersion) {

Review Comment:
   So this would be changes before 3.7 release to mark as stable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15527: Add reverseRange and reverseAll query over kv-store in IQv2 [kafka]

2023-10-18 Thread via GitHub


hanyuzheng7 commented on code in PR #14477:
URL: https://github.com/apache/kafka/pull/14477#discussion_r1364836021


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1586,12 +1593,15 @@ public  void shouldHandleKeyQuery(
 public  void shouldHandleRangeQuery(
 final Optional lower,
 final Optional upper,
+final boolean isKeyAscending,
 final Function valueExtactor,
-final Set expectedValue) {
+final List expectedValue) {

Review Comment:
   It seems that exiting code using expectedValue and actualValue for other 
methods, I will update them all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kip951 poc [kafka]

2023-10-18 Thread via GitHub


github-actions[bot] commented on PR #14063:
URL: https://github.com/apache/kafka/pull/14063#issuecomment-1769836600

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15620.
---
Fix Version/s: 3.7.0
   Resolution: Duplicate

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
>  
> You can see there are 2 leader epochs in this segment: 
> *segmentLeaderEpochs=\{5=6387830, 6=6721329}*
>  
> 5. From the remote log retention code 
> 

[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776989#comment-17776989
 ] 

Luke Chen commented on KAFKA-15620:
---

[~ckamal], thanks for the info.
[~h...@pinterest.com], I've backported into 3.6 branch. It'll be included in 
v3.6.1 release. Thanks.

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.6.1
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
>  
> You can see there are 2 leader epochs in this 

[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once for retention breach

2023-10-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15479:
--
Fix Version/s: 3.6.1

> Remote log segments should be considered once for retention breach
> --
>
> Key: KAFKA-15479
> URL: https://issues.apache.org/jira/browse/KAFKA-15479
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> When a remote log segment contains multiple epoch, then it gets considered 
> for multiple times during breach by retention size/time/start-offset. This 
> will affect the deletion by remote log retention size as it deletes the 
> number of segments lesser than expected. This is a follow-up of KAFKA-15352



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364833452


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java:
##
@@ -151,6 +152,36 @@ void testHardFailures(Exception exception) {
 }
 }
 
+@Test
+public void testNetworkTimeout() {

Review Comment:
   CoordinatorRequestManager doesn't have it - so I added the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15527: Add reverseRange and reverseAll query over kv-store in IQv2 [kafka]

2023-10-18 Thread via GitHub


hanyuzheng7 commented on code in PR #14477:
URL: https://github.com/apache/kafka/pull/14477#discussion_r1364832374


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1586,12 +1593,15 @@ public  void shouldHandleKeyQuery(
 public  void shouldHandleRangeQuery(
 final Optional lower,
 final Optional upper,
+final boolean isKeyAscending,
 final Function valueExtactor,
-final Set expectedValue) {
+final List expectedValue) {

Review Comment:
   ok, I will update it.



##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1604,9 +1614,10 @@ public  void shouldHandleRangeQuery(
 if (result.getGlobalResult() != null) {
 fail("global tables aren't implemented");
 } else {
-final Set actualValue = new HashSet<>();
+final List actualValue = new ArrayList<>();

Review Comment:
   ok, I will update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13973:

Fix Version/s: 3.6.1

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Assignee: Nicholas Telford
>Priority: Minor
> Fix For: 3.7.0, 3.6.1
>
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-13973: Fix inflated block cache metrics [kafka]

2023-10-18 Thread via GitHub


mjsax commented on PR #14317:
URL: https://github.com/apache/kafka/pull/14317#issuecomment-1769829743

   Cherry-picked this to `3.6` branch. Also tries to cherry-pick to `3.5` but 
the newly added test fails on 3.5 -- not sure why yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15527: Add reverseRange and reverseAll query over kv-store in IQv2 [kafka]

2023-10-18 Thread via GitHub


hanyuzheng7 commented on PR #14477:
URL: https://github.com/apache/kafka/pull/14477#issuecomment-1769829820

   > LGTM. Two nits to improve variable names.
   > 
   > As discussed in person, we also should update JavaDocs for `RangeQuery`, 
and `ReadOnlyKeyValueStore` to make it more explicit what ordering guarantees 
are provided.
   > 
   > Furthermore, we also need to update `docs/streams/...`
   > 
   > We can either add all this to this PR, or you do follow up PR. Just let me 
know.
   
   I believe it would be best to open a new PR dedicated solely to updating the 
Javadoc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364830057


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java:
##
@@ -151,6 +152,36 @@ void testHardFailures(Exception exception) {
 }
 }
 
+@Test
+public void testNetworkTimeout() {

Review Comment:
   Per previous comment: NetworkTimeout isn't handled correctly in the 
commitRequestManager, therefore I want to address this in that PR.  I will 
check other request manager to ensure this is tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364829502


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -254,14 +254,14 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 new ArrayList<>(this.requestedPartitions),
 throwOnFetchStableOffsetUnsupported);
 return new NetworkClientDelegate.UnsentRequest(
-builder,
-coordinatorRequestManager.coordinator(),
-(r, t) -> onResponse(r.receivedTimeMs(), 
(OffsetFetchResponse) r.responseBody()));
+builder,
+coordinatorRequestManager.coordinator(),
+(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) 
r.responseBody()));

Review Comment:
   In fact, this is a bug and I'm planning to address this in 
[KAFKA-15562](https://issues.apache.org/jira/browse/KAFKA-15562).  Can we 
address the gap all in a follow up PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types

2023-10-18 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15562:
---
Description: 
Both fetchOffsetRequest and commitOffsetRequest handlers don't have sufficient 
logic to handle timeout exception.

 

CommitOffsetRequest handler also doesn't handle various of server error such as 
coordinator not found. We need to handle:

If Exception is non null:

 - handle RetriableError that respects requestTimeoutMs

 - handle NonRetriableError

 

If the response contains error, ensure to:

 - mark coordinator unknown if needed

 - retry if needed

 - fail the request

> Ensure fetch offset and commit offset handler handles both timeout and 
> various error types
> --
>
> Key: KAFKA-15562
> URL: https://issues.apache.org/jira/browse/KAFKA-15562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> Both fetchOffsetRequest and commitOffsetRequest handlers don't have 
> sufficient logic to handle timeout exception.
>  
> CommitOffsetRequest handler also doesn't handle various of server error such 
> as coordinator not found. We need to handle:
> If Exception is non null:
>  - handle RetriableError that respects requestTimeoutMs
>  - handle NonRetriableError
>  
> If the response contains error, ensure to:
>  - mark coordinator unknown if needed
>  - retry if needed
>  - fail the request



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types

2023-10-18 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15562:
---
Summary: Ensure fetch offset and commit offset handler handles both timeout 
and various error types  (was: Ensure CommitRequestManager handles offset 
commit error handling)

> Ensure fetch offset and commit offset handler handles both timeout and 
> various error types
> --
>
> Key: KAFKA-15562
> URL: https://issues.apache.org/jira/browse/KAFKA-15562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> Review the code in Consumercoordinator#OffsetCommitResponseHandler 
> Implement the error handling and add tests for these errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types

2023-10-18 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15562:
---
Description: (was: Review the code in 
Consumercoordinator#OffsetCommitResponseHandler 

Implement the error handling and add tests for these errors.)

> Ensure fetch offset and commit offset handler handles both timeout and 
> various error types
> --
>
> Key: KAFKA-15562
> URL: https://issues.apache.org/jira/browse/KAFKA-15562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15534: Inject request completion time when the request failed [kafka]

2023-10-18 Thread via GitHub


philipnee commented on code in PR #14532:
URL: https://github.com/apache/kafka/pull/14532#discussion_r1364820542


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -256,27 +256,39 @@ public String toString() {
 
 public static class FutureCompletionHandler implements 
RequestCompletionHandler {
 
+private long responseCompletionTimeMs;
 private final CompletableFuture future;
 
 FutureCompletionHandler() {
-this.future = new CompletableFuture<>();
+future = new CompletableFuture<>();
 }
 
-public void onFailure(final RuntimeException e) {
-future.completeExceptionally(e);
+public void onFailure(final long currentTimeMs, final RuntimeException 
e) {

Review Comment:
   You are right, I should add one there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Worker sink task threaded test mockito [kafka]

2023-10-18 Thread via GitHub


hgeraldino closed pull request #14580: Worker sink task threaded test mockito
URL: https://github.com/apache/kafka/pull/14580


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Worker sink task threaded test mockito [kafka]

2023-10-18 Thread via GitHub


hgeraldino opened a new pull request, #14580:
URL: https://github.com/apache/kafka/pull/14580

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] wip: investigate Flaky tests [kafka]

2023-10-18 Thread via GitHub


dengziming closed pull request #14512: wip: investigate Flaky tests
URL: https://github.com/apache/kafka/pull/14512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 [kafka]

2023-10-18 Thread via GitHub


hudeqi commented on PR #14567:
URL: https://github.com/apache/kafka/pull/14567#issuecomment-1769788407

   LGTM, thanks for this PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15566: Fix flaky tests in FetchRequestTest.scala in KRaft mode [kafka]

2023-10-18 Thread via GitHub


showuon commented on PR #14573:
URL: https://github.com/apache/kafka/pull/14573#issuecomment-1769750941

   @dengziming , do you want to have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15633: Fix overwrite of meta.properties at startup to handle JBOD. [WIP] [kafka]

2023-10-18 Thread via GitHub


pprovenzano commented on PR #14578:
URL: https://github.com/apache/kafka/pull/14578#issuecomment-1769746613

   Adding of the directory.id in any form will prevent the cluster from rolling 
back to a previous ZK version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15479) Remote log segments should be considered once for retention breach

2023-10-18 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776960#comment-17776960
 ] 

Henry Cai commented on KAFKA-15479:
---

This is also related to https://issues.apache.org/jira/browse/KAFKA-15620, can 
the fix be back ported to Kafka 3.6 branch?

> Remote log segments should be considered once for retention breach
> --
>
> Key: KAFKA-15479
> URL: https://issues.apache.org/jira/browse/KAFKA-15479
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.7.0
>
>
> When a remote log segment contains multiple epoch, then it gets considered 
> for multiple times during breach by retention size/time/start-offset. This 
> will affect the deletion by remote log retention size as it deletes the 
> number of segments lesser than expected. This is a follow-up of KAFKA-15352



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-18 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1364758468


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15641:
--
Description: 
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}.

Here is the test code:

{code:java}
@Test
public void testFetchedRecordsAfterSeek() {
buildFetcher(OffsetResetStrategy.NONE,
 new ByteArrayDeserializer(),
 new ByteArrayDeserializer(),
 2,
 IsolationLevel.READ_UNCOMMITTED);

assignFromUser(singleton(tp0));

// Step 1: seek to offset 0 of our partition.
subscriptions.seek(tp0, 0);

// Step 2: issue a mock broker request to fetch data from the current 
offset in our local state,
// i.e. offset 0.
assertTrue(sendFetches() > 0);

// Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));

// Step 4: process the network I/O to receive the response from the broker 
with the OFFSET_OUT_OF_RANGE
// that was injected. Note, however, that we haven't "collected" the fetch 
data included in the response.
networkClientDelegate.poll(time.timer(0));

// Step 5: validate that the partition is not marked as needing its offset 
reset. The response validation
// logic is performed during the fetch collection, which doesn't happen 
until assertEmptyFetch below.
assertFalse(subscriptions.isOffsetResetNeeded(tp0));

// Step 6: update the partition's position in our local state to offset 2. 
We still haven't collected the
// fetch, so we haven't performed any validation of the fetch response.
subscriptions.seek(tp0, 2);

// Step 7: perform the fetch collection. As part of that process, error 
handling is performed. Since
// we intentionally injected an error above, this error will be checked and 
handled in the
// FetchCollector.handleInitializeErrors method. When handling 
OFFSET_OUT_OF_RANGE, handleInitializeErrors
// will notice that the original requested offset (0) is different from the 
state of our current offset (2).
assertEmptyFetch("Should not return records or advance position after 
seeking to end of topic partition");
}
{code}

Here is the code from {{FetchCollector.handleInitializeErrors}}:

{code:java}
private void handleInitializeErrors(final CompletedFetch completedFetch, final 
Errors error) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();

. . .

if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);

if (!clearedReplicaId.isPresent()) {
// If there's no preferred replica to clear, we're fetching from 
the leader so handle
// this error normally
SubscriptionState.FetchPosition position = 
subscriptions.position(tp);

if (position == null || fetchOffset != position.offset) {
log.debug("Discarding stale fetch response for partition {} 
since the fetched offset {} " +
"does not match the current offset {}", tp, 
fetchOffset, position);
} else {
String errorMessage = "Fetch position " + position + " is out 
of range for partition " + tp;

if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("{}, resetting offset", errorMessage);
subscriptions.requestOffsetReset(tp);
} else {
log.info("{}, raising error to the application since no 
reset policy is configured", 
errorMessage);
throw new OffsetOutOfRangeException(errorMessage,
Collections.singletonMap(tp, position.offset));
}
}
} else {
log.debug("Unset the preferred read replica {} for partition {} 
since we got {} when fetching {}",
clearedReplicaId.get(), tp, error, fetchOffset);
}
}

. . .

}
{code}

The question is: why is the {{OFFSET_OUT_OF_RANGE}} error ignored just because 
of the following code?

{code:java}
if (position == null || fetchOffset != position.offset) {
{code}

It's a bit weird that the above check is only done for the 
{{OFFSET_OUT_OF_RANGE}} error, instead of any error.

  was:
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in 

[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15641:
--
Issue Type: Bug  (was: Improvement)

> Investigate CompletedFetch handleInitializeErrors for accuracy
> --
>
> Key: KAFKA-15641
> URL: https://issues.apache.org/jira/browse/KAFKA-15641
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
> testFetchedRecordsAfterSeek, which [upon closer 
> inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
> may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}.
> Here is the test code:
> {code:java}
> @Test
> public void testFetchedRecordsAfterSeek() {
> buildFetcher(OffsetResetStrategy.NONE,
>  new ByteArrayDeserializer(),
>  new ByteArrayDeserializer(),
>  2,
>  IsolationLevel.READ_UNCOMMITTED);
> assignFromUser(singleton(tp0));
> // Step 1: seek to offset 0 of our partition.
> subscriptions.seek(tp0, 0);
> // Step 2: issue a mock broker request to fetch data from the current 
> offset in our local state,
> // i.e. offset 0.
> assertTrue(sendFetches() > 0);
> // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
> client.prepareResponse(fullFetchResponse(tidp0, records, 
> Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
> // Step 4: process the network I/O to receive the response from the 
> broker with the OFFSET_OUT_OF_RANGE
> // that was injected. Note, however, that we haven't "collected" the 
> fetch data included in the response.
> networkClientDelegate.poll(time.timer(0));
> // Step 5: validate that the partition is not marked as needing its 
> offset reset. The response validation
> // logic is performed during the fetch collection, which doesn't happen 
> until assertEmptyFetch below.
> assertFalse(subscriptions.isOffsetResetNeeded(tp0));
> // Step 6: update the partition's position in our local state to offset 
> 2. We still haven't collected the
> // fetch, so we haven't performed any validation of the fetch response.
> subscriptions.seek(tp0, 2);
> // Step 7: perform the fetch collection. As part of that process, error 
> handling is performed. Since
> // we intentionally injected an error above, this error will be checked 
> and handled in the
> // FetchCollector.handleInitializeErrors method. When handling 
> OFFSET_OUT_OF_RANGE, handleInitializeErrors
> // will notice that the original requested offset (0) is different from 
> the state of our current offset (2).
> assertEmptyFetch("Should not return records or advance position after 
> seeking to end of topic partition");
> }
> {code}
> Here is the code from {{FetchCollector.handleInitializeErrors}}:
> {code:java}
> private void handleInitializeErrors(final CompletedFetch completedFetch, 
> final Errors error) {
> final TopicPartition tp = completedFetch.partition;
> final long fetchOffset = completedFetch.nextFetchOffset();
> . . .
> if (error == Errors.OFFSET_OUT_OF_RANGE) {
> Optional clearedReplicaId = 
> subscriptions.clearPreferredReadReplica(tp);
> if (!clearedReplicaId.isPresent()) {
> // If there's no preferred replica to clear, we're fetching from 
> the leader so handle
> // this error normally
> SubscriptionState.FetchPosition position = 
> subscriptions.position(tp);
> if (position == null || fetchOffset != position.offset) {
> log.debug("Discarding stale fetch response for partition {} 
> since the fetched offset {} " +
> "does not match the current offset {}", tp, 
> fetchOffset, position);
> } else {
> String errorMessage = "Fetch position " + position + " is out 
> of range for partition " + tp;
> if (subscriptions.hasDefaultOffsetResetPolicy()) {
> log.info("{}, resetting offset", errorMessage);
> subscriptions.requestOffsetReset(tp);
> } else {
> log.info("{}, raising error to the application since no 
> reset policy is configured", 
> errorMessage);
> throw new OffsetOutOfRangeException(errorMessage,
> Collections.singletonMap(tp, position.offset));
> }
> }
> } else {
> log.debug("Unset the preferred read replica {} for partition {} 
> since we got {} when fetching {}",
> clearedReplicaId.get(), tp, error, 

[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15641:
--
Description: 
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}.

Here is the test code:

{code:java}
@Test
public void testFetchedRecordsAfterSeek() {
buildFetcher(OffsetResetStrategy.NONE,
 new ByteArrayDeserializer(),
 new ByteArrayDeserializer(),
 2,
 IsolationLevel.READ_UNCOMMITTED);

assignFromUser(singleton(tp0));

// Step 1: seek to offset 0 of our partition.
subscriptions.seek(tp0, 0);

// Step 2: issue a mock broker request to fetch data from the current 
offset in our local state,
// i.e. offset 0.
assertTrue(sendFetches() > 0);

// Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));

// Step 4: process the network I/O to receive the response from the broker 
with the OFFSET_OUT_OF_RANGE
// that was injected. Note, however, that we haven't "collected" the fetch 
data included in the response.
networkClientDelegate.poll(time.timer(0));

// Step 5: validate that the partition is not marked as needing its offset 
reset. The response validation
// logic is performed during the fetch collection, which doesn't happen 
until we call assertEmptyFetch below.
assertFalse(subscriptions.isOffsetResetNeeded(tp0));

// Step 6: update the partition's position in our local state to offset 2. 
We still haven't collected the
// fetch, so we haven't performed any validation of the fetch response.
subscriptions.seek(tp0, 2);

// Step 7: perform the fetch collection. As part of that process, error 
handling is performed. Since
// we intentionally injected an error above, this error will be checked and 
handled in the
// FetchCollector.handleInitializeErrors method. When handling 
OFFSET_OUT_OF_RANGE, handleInitializeErrors
// will notice that the original requested offset (0) is different from the 
state of our current offset (2).
assertEmptyFetch("Should not return records or advance position after 
seeking to end of topic partition");
}
{code}

Here is the code from {{FetchCollector.handleInitializeErrors}}:

{code:java}
private void handleInitializeErrors(final CompletedFetch completedFetch, final 
Errors error) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();

. . .

if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);

if (!clearedReplicaId.isPresent()) {
// If there's no preferred replica to clear, we're fetching from 
the leader so handle
// this error normally
SubscriptionState.FetchPosition position = 
subscriptions.position(tp);

if (position == null || fetchOffset != position.offset) {
log.debug("Discarding stale fetch response for partition {} 
since the fetched offset {} " +
"does not match the current offset {}", tp, 
fetchOffset, position);
} else {
String errorMessage = "Fetch position " + position + " is out 
of range for partition " + tp;

if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("{}, resetting offset", errorMessage);
subscriptions.requestOffsetReset(tp);
} else {
log.info("{}, raising error to the application since no 
reset policy is configured", 
errorMessage);
throw new OffsetOutOfRangeException(errorMessage,
Collections.singletonMap(tp, position.offset));
}
}
} else {
log.debug("Unset the preferred read replica {} for partition {} 
since we got {} when fetching {}",
clearedReplicaId.get(), tp, error, fetchOffset);
}
}

. . .

}
{code}

The question is: why is the {{OFFSET_OUT_OF_RANGE}} error ignored just because 
of the following code?

{code:java}
if (position == null || fetchOffset != position.offset) {
{code}

It's a bit weird that the above check is only done for the 
{{OFFSET_OUT_OF_RANGE}} error, instead of any error.

  was:
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in 

[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15641:
--
Description: 
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}.

Here is the test code:

{code:java}
@Test
public void testFetchedRecordsAfterSeek() {
buildFetcher(OffsetResetStrategy.NONE,
 new ByteArrayDeserializer(),
 new ByteArrayDeserializer(),
 2,
 IsolationLevel.READ_UNCOMMITTED);

assignFromUser(singleton(tp0));

// Step 1: seek to offset 0 of our partition.
subscriptions.seek(tp0, 0);

// Step 2: issue a mock broker request to fetch data from the current 
offset in our local state, i.e. offset 0.
assertTrue(sendFetches() > 0);

// Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));

// Step 4: process the network I/O to receive the response from the broker 
with the OFFSET_OUT_OF_RANGE
// that was injected. Note, however, that we haven't "collected" the fetch 
data included in the response.
networkClientDelegate.poll(time.timer(0));

// Step 5: validate that the partition is not marked as needing its offset 
reset. The response validation
// logic is performed during the fetch collection, which doesn't happen 
until we call assertEmptyFetch below.
assertFalse(subscriptions.isOffsetResetNeeded(tp0));

// Step 6: update the partition's position in our local state to offset 2. 
We still haven't collected the
// fetch, so we haven't performed any validation of the fetch response.
subscriptions.seek(tp0, 2);

// Step 7: perform the fetch collection. As part of that process, error 
handling is performed. Since
// we intentionally injected an error above, this error will be checked and 
handled in the
// FetchCollector.handleInitializeErrors method. When handling 
OFFSET_OUT_OF_RANGE, handleInitializeErrors
// will notice that the original requested offset (0) is different from the 
state of our current offset (2).
assertEmptyFetch("Should not return records or advance position after 
seeking to end of topic partition");
}
{code}

Here is the code from {{FetchCollector.handleInitializeErrors}}:

{code:java}
private void handleInitializeErrors(final CompletedFetch completedFetch, final 
Errors error) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();

. . .

if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);

if (!clearedReplicaId.isPresent()) {
// If there's no preferred replica to clear, we're fetching from 
the leader so handle
// this error normally
SubscriptionState.FetchPosition position = 
subscriptions.position(tp);

if (position == null || fetchOffset != position.offset) {
log.debug("Discarding stale fetch response for partition {} 
since the fetched offset {} " +
"does not match the current offset {}", tp, 
fetchOffset, position);
} else {
String errorMessage = "Fetch position " + position + " is out 
of range for partition " + tp;

if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("{}, resetting offset", errorMessage);
subscriptions.requestOffsetReset(tp);
} else {
log.info("{}, raising error to the application since no 
reset policy is configured", 
errorMessage);
throw new OffsetOutOfRangeException(errorMessage,
Collections.singletonMap(tp, position.offset));
}
}
} else {
log.debug("Unset the preferred read replica {} for partition {} 
since we got {} when fetching {}",
clearedReplicaId.get(), tp, error, fetchOffset);
}
}

. . .

}
{code}

The question is: why is the {{OFFSET_OUT_OF_RANGE}} error ignored just because 
of the following code?

{code:java}
if (position == null || fetchOffset != position.offset) {
{code}

It's a bit weird that the above check is only done for the 
{{OFFSET_OUT_OF_RANGE}} error, instead of any error.

  was:
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in 

[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15641:
--
Description: 
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}.

Here is the test code:

{code:java}
@Test
public void testFetchedRecordsAfterSeek() {
buildFetcher(OffsetResetStrategy.NONE,
 new ByteArrayDeserializer(),
 new ByteArrayDeserializer(),
 2,
 IsolationLevel.READ_UNCOMMITTED);

assignFromUser(singleton(tp0));

// Step 1: seek to offset 0 of our partition.
subscriptions.seek(tp0, 0);

// Step 2: issue a mock broker request to fetch data from the current 
offset in our local state, i.e. offset 0.
assertTrue(sendFetches() > 0);

// Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));

// Step 4: process the network I/O to receive the response from the broker 
with the OFFSET_OUT_OF_RANGE
// that was injected. Note, however, that we haven't "collected" the fetch 
data included in the response.
networkClientDelegate.poll(time.timer(0));

// Step 5: validate that the partition is not marked as needing its offset 
reset. The response validation
// logic is performed during the fetch collection, which doesn't happen 
until we call assertEmptyFetch below.
assertFalse(subscriptions.isOffsetResetNeeded(tp0));

// Step 6: update the partition's position in our local state to offset 2. 
We still haven't collected the
// fetch, so we haven't performed any validation of the fetch response.
subscriptions.seek(tp0, 2);

// Step 7: perform the fetch collection. As part of that process, error 
handling is performed. Since
// we intentionally injected an error above, this error will be checked and 
handled in the
// FetchCollector.handleInitializeErrors method. When handling 
OFFSET_OUT_OF_RANGE, handleInitializeErrors
// will notice that the original requested offset (0) is different from the 
state of our current offset (2).
assertEmptyFetch("Should not return records or advance position after 
seeking to end of topic partition");
}
{code}

Here is the code from {{FetchCollector.handleInitializeErrors}}:

{code:java}
private void handleInitializeErrors(final CompletedFetch completedFetch, final 
Errors error) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();

. . .
if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);

if (!clearedReplicaId.isPresent()) {
// If there's no preferred replica to clear, we're fetching from 
the leader so handle
// this error normally
SubscriptionState.FetchPosition position = 
subscriptions.position(tp);

if (position == null || fetchOffset != position.offset) {
log.debug("Discarding stale fetch response for partition {} 
since the fetched offset {} " +
"does not match the current offset {}", tp, 
fetchOffset, position);
} else {
String errorMessage = "Fetch position " + position + " is out 
of range for partition " + tp;

if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("{}, resetting offset", errorMessage);
subscriptions.requestOffsetReset(tp);
} else {
log.info("{}, raising error to the application since no 
reset policy is configured", 
errorMessage);
throw new OffsetOutOfRangeException(errorMessage,
Collections.singletonMap(tp, position.offset));
}
}
} else {
log.debug("Unset the preferred read replica {} for partition {} 
since we got {} when fetching {}",
clearedReplicaId.get(), tp, error, fetchOffset);
}
}

. . .
}
{code}

The question is: why is the {{OFFSET_OUT_OF_RANGE}} error ignored just because 
of the following code?

{code:java}
if (position == null || fetchOffset != position.offset) {
{code}

It's a bit weird that the above check is only done for the 
{{OFFSET_OUT_OF_RANGE}} error, instead of any error.

  was:
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in 

[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15641:
--
Description: 
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}.

Here is the test code:

{code:java}
@Test
public void testFetchedRecordsAfterSeek() {
buildFetcher(OffsetResetStrategy.NONE,
 new ByteArrayDeserializer(),
 new ByteArrayDeserializer(),
 2,
 IsolationLevel.READ_UNCOMMITTED);

assignFromUser(singleton(tp0));

// Step 1: seek to offset 0 of our partition.
subscriptions.seek(tp0, 0);

// Step 2: issue a mock broker request to fetch data from the current 
offset in our local state, i.e. offset 0.
assertTrue(sendFetches() > 0);

// Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));

// Step 4: process the network I/O to receive the response from the broker 
with the OFFSET_OUT_OF_RANGE
// that was injected. Note, however, that we haven't "collected" the fetch 
data included in the response.
networkClientDelegate.poll(time.timer(0));

// Step 5: validate that the partition is not marked as needing its offset 
reset. The response validation
// logic is performed during the fetch collection, which doesn't happen 
until we call assertEmptyFetch below.
assertFalse(subscriptions.isOffsetResetNeeded(tp0));

// Step 6: update the partition's position in our local state to offset 2. 
We still haven't collected the
// fetch, so we haven't performed any validation of the fetch response.
subscriptions.seek(tp0, 2);

// Step 7: perform the fetch collection. As part of that process, error 
handling is performed. Since
// we intentionally injected an error above, this error will be checked and 
handled in the
// FetchCollector.handleInitializeErrors method. When handling 
OFFSET_OUT_OF_RANGE, handleInitializeErrors
// will notice that the original requested offset (0) is different from the 
state of our current offset (2).
assertEmptyFetch("Should not return records or advance position after 
seeking to end of topic partition");
}
{code}

Here is the code from FetchCollector.handleInitializeErrors:

{code:java}
private void handleInitializeErrors(final CompletedFetch completedFetch, final 
Errors error) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();

if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.FENCED_LEADER_EPOCH ||
error == Errors.OFFSET_NOT_AVAILABLE) {
. . .
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
. . .
} else if (error == Errors.UNKNOWN_TOPIC_ID) {
. . .
} else if (error == Errors.INCONSISTENT_TOPIC_ID) {
. . .
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);

if (!clearedReplicaId.isPresent()) {
// If there's no preferred replica to clear, we're fetching from 
the leader so handle
// this error normally
SubscriptionState.FetchPosition position = 
subscriptions.position(tp);

if (position == null || fetchOffset != position.offset) {
log.debug("Discarding stale fetch response for partition {} 
since the fetched offset {} " +
"does not match the current offset {}", tp, 
fetchOffset, position);
} else {
String errorMessage = "Fetch position " + position + " is out 
of range for partition " + tp;

if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("{}, resetting offset", errorMessage);
subscriptions.requestOffsetReset(tp);
} else {
log.info("{}, raising error to the application since no 
reset policy is configured", 
errorMessage);
throw new OffsetOutOfRangeException(errorMessage,
Collections.singletonMap(tp, position.offset));
}
}
} else {
log.debug("Unset the preferred read replica {} for partition {} 
since we got {} when fetching {}",
clearedReplicaId.get(), tp, error, fetchOffset);
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
. . .
} else if (error == 

[jira] [Updated] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15641:
--
Description: 
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in 
{{{}FetchCollector.handleInitializeErrors{}}}.

Here is the test code:

 
{code:java}
@Test
public void testFetchedRecordsAfterSeek() {
buildFetcher(OffsetResetStrategy.NONE,
 new ByteArrayDeserializer(),
 new ByteArrayDeserializer(),
 2,
 IsolationLevel.READ_UNCOMMITTED);

assignFromUser(singleton(tp0));

// Step 1: seek to offset 0 of our partition.
subscriptions.seek(tp0, 0);

// Step 2: issue a mock broker request to fetch data from the current 
offset in our local state, i.e. offset 0.
assertTrue(sendFetches() > 0);

// Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));

// Step 4: process the network I/O to receive the response from the broker 
with the OFFSET_OUT_OF_RANGE
// that was injected. Note, however, that we haven't "collected" the fetch 
data included in the response.
networkClientDelegate.poll(time.timer(0));

// Step 5: validate that the partition is not marked as needing its offset 
reset. The response validation
// logic is performed during the fetch collection, which doesn't happen 
until we call assertEmptyFetch below.
assertFalse(subscriptions.isOffsetResetNeeded(tp0));

// Step 6: update the partition's position in our local state to offset 2. 
We still haven't collected the
// fetch, so we haven't performed any validation of the fetch response.
subscriptions.seek(tp0, 2);

// Step 7: perform the fetch collection. As part of that process, error 
handling is performed. Since
// we intentionally injected an error above, this error will be checked and 
handled in the
// FetchCollector.handleInitializeErrors method. When handling 
OFFSET_OUT_OF_RANGE, handleInitializeErrors
// will notice that the original requested offset (0) is different from the 
state of our current offset (2).
assertEmptyFetch("Should not return records or advance position after 
seeking to end of topic partition");
}{code}
 

 

Here is the code from FetchCollector.handleInitializeErrors:

 

 
{code:java}
private void handleInitializeErrors(final CompletedFetch completedFetch, final 
Errors error) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();

if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.FENCED_LEADER_EPOCH ||
error == Errors.OFFSET_NOT_AVAILABLE) {
. . .
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
. . .
} else if (error == Errors.UNKNOWN_TOPIC_ID) {
. . .
} else if (error == Errors.INCONSISTENT_TOPIC_ID) {
. . .
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);

if (!clearedReplicaId.isPresent()) {
// If there's no preferred replica to clear, we're fetching from 
the leader so handle
// this error normally
SubscriptionState.FetchPosition position = 
subscriptions.position(tp);

if (position == null || fetchOffset != position.offset) {
log.debug("Discarding stale fetch response for partition {} 
since the fetched offset {} " +
"does not match the current offset {}", tp, 
fetchOffset, position);
} else {
String errorMessage = "Fetch position " + position + " is out 
of range for partition " + tp;

if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("{}, resetting offset", errorMessage);
subscriptions.requestOffsetReset(tp);
} else {
log.info("{}, raising error to the application since no 
reset policy is configured", 
errorMessage);
throw new OffsetOutOfRangeException(errorMessage,
Collections.singletonMap(tp, position.offset));
}
}
} else {
log.debug("Unset the preferred read replica {} for partition {} 
since we got {} when fetching {}",
clearedReplicaId.get(), tp, error, fetchOffset);
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
. . .
} else if (error == 

[jira] [Updated] (KAFKA-15338) The metric group documentation for metrics added in KAFKA-13945 is incorrect

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15338:

Affects Version/s: (was: 3.4.0)
   (was: 3.3.1)
   (was: 3.3.2)
   (was: 3.5.0)
   (was: 3.4.1)
   (was: 3.5.1)

> The metric group documentation for metrics added in KAFKA-13945 is incorrect
> 
>
> Key: KAFKA-15338
> URL: https://issues.apache.org/jira/browse/KAFKA-15338
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Neil Buesing
>Assignee: Atul Sharma
>Priority: Trivial
>  Labels: beginner, newbie
> Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2
>
>
> ops.html (docs/streams/ops.html) incorrectly states that the metrics type is 
> "stream-processor-node-metrics", but in looking at the metrics and inspecting 
> the code in TopicMetrics, these metrics have a type of "stream-topic-metrics".
> 4 metrics are in error "bytes-consumed-total", "bytes-produced-total", 
> "records-consumed-total", and "records-produced-total".
> Looks like the type was changed from the KIP, and the documentation still 
> reflects the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15641) Investigate CompletedFetch handleInitializeErrors for accuracy

2023-10-18 Thread Kirk True (Jira)
Kirk True created KAFKA-15641:
-

 Summary: Investigate CompletedFetch handleInitializeErrors for 
accuracy
 Key: KAFKA-15641
 URL: https://issues.apache.org/jira/browse/KAFKA-15641
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and 
{{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments 
[here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and 
[here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], there 
are three issues...

 

First:
{quote}{color:#172b4d}This is an existing issue. But the way we handle paused 
partitions in {{collectFetch}} seems problematic. The application thread first 
calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls 
{{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief 
window where the paused partition is not included in either {{nextInLineFetch}} 
or {{{}completedFetches{}}}. If the background thread kicks in in that window, 
it could have fetched another chunk for that partition and added the response 
back to FetchBuffer. This would violate the assumption there is no more than 
one pending {{CompletedFetch}} per partition in FetchBuffer and could cause 
records returned not in offset order or duplicates to be returned.{color}
{quote}
 

{color:#172b4d}Second:{color}
{quote}{color:#172b4d}The second existing issue is on the 
{{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is 
that after all records are drained from {{{}nextInLineFetch{}}}. We only call 
{{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However, 
until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}}, 
the background thread can't fetch the next chunk. So, it seems that we will 
just be stuck here.{color}
{quote}
 

{color:#172b4d}Third:{color}
{quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and 
{{fetchBuffer.poll}} are separate operations and we expect the caller to call 
them in the right order to avoid a partition missing in FetchBuffer in the 
transition phase. It still leaves us with the situation that a partition could 
be in both completedFetches and nextInLineFetch at a particular time. It's not 
a problem for now, but it may be in the future. Could we make them an atomic 
operation? If not, could we add a comment to document the correct usage of the 
api and the impact on partition being duplicated in completedFetches and 
nextInLineFetch?{color}
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15338) The metric group documentation for metrics added in KAFKA-13945 is incorrect

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15338:

Fix Version/s: 3.3.3
   3.4.2
   3.5.2

> The metric group documentation for metrics added in KAFKA-13945 is incorrect
> 
>
> Key: KAFKA-15338
> URL: https://issues.apache.org/jira/browse/KAFKA-15338
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.5.1
>Reporter: Neil Buesing
>Assignee: Atul Sharma
>Priority: Trivial
>  Labels: beginner, newbie
> Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2
>
>
> ops.html (docs/streams/ops.html) incorrectly states that the metrics type is 
> "stream-processor-node-metrics", but in looking at the metrics and inspecting 
> the code in TopicMetrics, these metrics have a type of "stream-topic-metrics".
> 4 metrics are in error "bytes-consumed-total", "bytes-produced-total", 
> "records-consumed-total", and "records-produced-total".
> Looks like the type was changed from the KIP, and the documentation still 
> reflects the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15378: fix streams upgrade system test [kafka]

2023-10-18 Thread via GitHub


mjsax commented on PR #14539:
URL: https://github.com/apache/kafka/pull/14539#issuecomment-1769719774

   The following system tests failed:
- test_upgrade_to_cooperative_rebalance
  - 0.10.1.1
  - 0.10.2.2
  - 1.0.2
  - 1.1.1
  - 2.0.1
  - 2.3.1
- test_app_upgrade
  - 2.6.3 / full
  - 2.7.2 / full
  - 3.3.2 / full 
- test_rolling_upgrade_with_2_bounces
  - 0.10.0.1
  - 0.10.1.1
  - 0.10.2.2
  - 0.11.0.3
  - 1.0.2
  - 2.6.3
  - 2.7.2
  - 3.3.2
- test_broker_type_bounce
  - "broker_type": "controller", "failure_mode": "hard_shutdown", 
"metadata_quorum": "ZK",
  - "broker_type": "leader", "failure_mode": "hard_shutdown", 
"metadata_quorum": "ISOLATED_KRAFT",
  - "broker_type": "leader", "failure_mode": "hard_shutdown", 
"metadata_quorum": "ZK",
- test_many_brokers_bounce
  - failure_mode": "clean_shutdown", "metadata_quorum": "ISOLATED_KRAFT",
  - "failure_mode": "clean_shutdown", "metadata_quorum": "ZK",
- test_compatible_brokers_eos_alpha_enabled
  - 2.6.3
  - 2.7.2
  - 3.3.2
- test_compatible_brokers_eos_disabled
  - 2.6.3
  - 2.7.2
  - 3.3.2
- test_compatible_brokers_eos_v2_enabled
  - 2.6.3
  - 2.7.2
  - 3.3.2
  
   Overall, we are not in good shape :(
   
   Triggered a re-run to see what is noise: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5896/
   
   But I actually believe, we might want to merge this PR as-is to unblock 
Mickeal's PR, and tackle each of these test one-by-one as follow up work? 
Thoughts? @mimaison @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add KIP-941 to Kafka Streams upgrade docs [kafka]

2023-10-18 Thread via GitHub


mjsax commented on PR #14577:
URL: https://github.com/apache/kafka/pull/14577#issuecomment-1769695164

   Merged to `trunk` and cherry-picked to `3.6` branch.
   
   Also ported to `kafka-site`: https://github.com/apache/kafka-site/pull/563 
(merged right away).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add KIP-941 to Kafka Streams upgrade docs [kafka]

2023-10-18 Thread via GitHub


mjsax merged PR #14577:
URL: https://github.com/apache/kafka/pull/14577


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimeStampKeyQuery and TimeStampRangeQuery

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15629:

Description: 
KIP-992: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery]

In the current IQv2 code, there are noticeable differences when interfacing 
with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
simple value for plain-kv-store but evolves into ValueAndTimestamp for 
ts-kv-store, which presents type safety issues in the API.

Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
compatibility concerns.

This brings us to the essence of our proposal: the introduction of distinct 
query types. One that returns a plain value, another for values accompanied by 
timestamps.

While querying a ts-kv-store for a plain value and then extracting it is 
feasible, it doesn't make sense to query a plain-kv-store for a 
ValueAndTimestamp.

Our vision is for plain-kv-store to always return V, while ts-kv-store should 
return ValueAndTimestamp.

  was:
In the current IQv2 code, there are noticeable differences when interfacing 
with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
simple value for plain-kv-store but evolves into ValueAndTimestamp for 
ts-kv-store, which presents type safety issues in the API.

Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
compatibility concerns.

This brings us to the essence of our proposal: the introduction of distinct 
query types. One that returns a plain value, another for values accompanied by 
timestamps.

While querying a ts-kv-store for a plain value and then extracting it is 
feasible, it doesn't make sense to query a plain-kv-store for a 
ValueAndTimestamp.

Our vision is for plain-kv-store to always return V, while ts-kv-store should 
return ValueAndTimestamp.


> proposal to introduce IQv2 Query Types: TimeStampKeyQuery and 
> TimeStampRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> KIP-992: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery]
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimeStampKeyQuery and TimeStampRangeQuery

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15629:

Labels: kip  (was: )

> proposal to introduce IQv2 Query Types: TimeStampKeyQuery and 
> TimeStampRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-18 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1364715414


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##
@@ -162,7 +165,9 @@ private Fetch fetchRecords(final CompletedFetch 
nextInLineFetch) {
 throw new IllegalStateException("Missing position for 
fetchable partition " + tp);
 
 if (nextInLineFetch.nextFetchOffset() == position.offset) {
-List> partRecords = 
nextInLineFetch.fetchRecords(fetchConfig, fetchConfig.maxPollRecords);
+List> partRecords = 
nextInLineFetch.fetchRecords(fetchConfig,

Review Comment:
   I've added this to KAFKA-15640 for a dedicate effort to clean up the 
modifications to this data to make them atomic operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15640) Refactor CompletedFetch initialization

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15640:
--
Description: 
The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and 
{{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments 
[here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and 
[here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], there 
are three issues...

 

First:
{quote}{color:#172b4d}This is an existing issue. But the way we handle paused 
partitions in {{collectFetch}} seems problematic. The application thread first 
calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls 
{{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief 
window where the paused partition is not included in either {{nextInLineFetch}} 
or {{{}completedFetches{}}}. If the background thread kicks in in that window, 
it could have fetched another chunk for that partition and added the response 
back to FetchBuffer. This would violate the assumption there is no more than 
one pending {{CompletedFetch}} per partition in FetchBuffer and could cause 
records returned not in offset order or duplicates to be returned.{color}
{quote}
 

{color:#172b4d}Second:{color}
{quote}{color:#172b4d}The second existing issue is on the 
{{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is 
that after all records are drained from {{{}nextInLineFetch{}}}. We only call 
{{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However, 
until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}}, 
the background thread can't fetch the next chunk. So, it seems that we will 
just be stuck here.{color}
{quote}
 

{color:#172b4d}Third:{color}
{quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and 
{{fetchBuffer.poll}} are separate operations and we expect the caller to call 
them in the right order to avoid a partition missing in FetchBuffer in the 
transition phase. It still leaves us with the situation that a partition could 
be in both completedFetches and nextInLineFetch at a particular time. It's not 
a problem for now, but it may be in the future. Could we make them an atomic 
operation? If not, could we add a comment to document the correct usage of the 
api and the impact on partition being duplicated in completedFetches and 
nextInLineFetch?{color}
{quote}

  was:
The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and 
{{CompletedFetch}} is awkward, to say the least.

Per [~junrao]'s comments: 
{quote}Currently, {{fetchBuffer.setNextInLineFetch}} and {{fetchBuffer.poll}} 
are separate operations and we expect the caller to call them in the right 
order to avoid a partition missing in FetchBuffer in the transition phase. It 
still leaves us with the situation that a partition could be in both 
completedFetches and nextInLineFetch at a particular time. It's not a problem 
for now, but it may be in the future. Could we make them an atomic operation? 
If not, could we add a comment to document the correct usage of the api and the 
impact on partition being duplicated in completedFetches and nextInLineFetch?
{quote}


> Refactor CompletedFetch initialization
> --
>
> Key: KAFKA-15640
> URL: https://issues.apache.org/jira/browse/KAFKA-15640
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>
> The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and 
> {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments 
> [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and 
> [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], 
> there are three issues...
>  
> First:
> {quote}{color:#172b4d}This is an existing issue. But the way we handle paused 
> partitions in {{collectFetch}} seems problematic. The application thread 
> first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls 
> {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief 
> window where the paused partition is not included in either 
> {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread 
> kicks in in that window, it could have fetched another chunk for that 
> partition and added the response back to FetchBuffer. This would violate the 
> assumption there is no more than one pending {{CompletedFetch}} per partition 
> in FetchBuffer and could cause records returned not in offset order or 
> duplicates to be returned.{color}
> {quote}
>  
> {color:#172b4d}Second:{color}
> {quote}{color:#172b4d}The second existing issue is on the 
> {{fetchBuffer.setNextInLineFetch}} 

[jira] [Updated] (KAFKA-15640) Refactor CompletedFetch initialization

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15640:
--
Priority: Major  (was: Minor)

> Refactor CompletedFetch initialization
> --
>
> Key: KAFKA-15640
> URL: https://issues.apache.org/jira/browse/KAFKA-15640
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and 
> {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments 
> [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and 
> [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], 
> there are three issues...
>  
> First:
> {quote}{color:#172b4d}This is an existing issue. But the way we handle paused 
> partitions in {{collectFetch}} seems problematic. The application thread 
> first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls 
> {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief 
> window where the paused partition is not included in either 
> {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread 
> kicks in in that window, it could have fetched another chunk for that 
> partition and added the response back to FetchBuffer. This would violate the 
> assumption there is no more than one pending {{CompletedFetch}} per partition 
> in FetchBuffer and could cause records returned not in offset order or 
> duplicates to be returned.{color}
> {quote}
>  
> {color:#172b4d}Second:{color}
> {quote}{color:#172b4d}The second existing issue is on the 
> {{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is 
> that after all records are drained from {{{}nextInLineFetch{}}}. We only call 
> {{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However, 
> until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}}, 
> the background thread can't fetch the next chunk. So, it seems that we will 
> just be stuck here.{color}
> {quote}
>  
> {color:#172b4d}Third:{color}
> {quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and 
> {{fetchBuffer.poll}} are separate operations and we expect the caller to call 
> them in the right order to avoid a partition missing in FetchBuffer in the 
> transition phase. It still leaves us with the situation that a partition 
> could be in both completedFetches and nextInLineFetch at a particular time. 
> It's not a problem for now, but it may be in the future. Could we make them 
> an atomic operation? If not, could we add a comment to document the correct 
> usage of the api and the impact on partition being duplicated in 
> completedFetches and nextInLineFetch?{color}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15640) Refactor CompletedFetch initialization

2023-10-18 Thread Kirk True (Jira)
Kirk True created KAFKA-15640:
-

 Summary: Refactor CompletedFetch initialization
 Key: KAFKA-15640
 URL: https://issues.apache.org/jira/browse/KAFKA-15640
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and 
{{CompletedFetch}} is awkward, to say the least.

Per [~junrao]'s comments: 
{quote}Currently, {{fetchBuffer.setNextInLineFetch}} and {{fetchBuffer.poll}} 
are separate operations and we expect the caller to call them in the right 
order to avoid a partition missing in FetchBuffer in the transition phase. It 
still leaves us with the situation that a partition could be in both 
completedFetches and nextInLineFetch at a particular time. It's not a problem 
for now, but it may be in the future. Could we make them an atomic operation? 
If not, could we add a comment to document the correct usage of the api and the 
impact on partition being duplicated in completedFetches and nextInLineFetch?
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Don't log missing partitions at ERROR level [kafka]

2023-10-18 Thread via GitHub


splett2 opened a new pull request, #14579:
URL: https://github.com/apache/kafka/pull/14579

   ### What
   We are logging missing partitions as errors in the `state.change.logger` 
which may be a bit alarming. There are valid cases where the partition may not 
exist, eg: partition reassignment.
   
   ### Testing
   Verified that ReassignPartitionsIntegrationTest logs partition reassignments 
at error without the patch and doesn't with the patch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-18 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1364712500


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -35,19 +41,24 @@
  *
  * 
  *
- * Note: this class is not thread-safe and is intended to only be 
used from a single thread.
+ * Note: this class is thread-safe with the intention that {@link 
CompletedFetch the data} will be

Review Comment:
   I've filed KAFKA-15640 to look into this in more detail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-18 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1364707771


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-18 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1364706025


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-18 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1364704339


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("ClassDataAbstractionCoupling")
+public class ConsumerNetworkThreadTest {
+
+private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder;
+private Time time;
+private ConsumerMetadata metadata;
+private NetworkClientDelegate networkClient;
+private BlockingQueue applicationEventsQueue;
+private ApplicationEventProcessor applicationEventProcessor;
+private OffsetsRequestManager offsetsRequestManager;
+private CommitRequestManager commitManager;
+private ConsumerNetworkThread consumerNetworkThread;
+private MockClient client;
+
+@BeforeEach
+public void setup() {
+testBuilder = new 
ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder();
+time = testBuilder.time;
+metadata = testBuilder.metadata;
+networkClient = testBuilder.networkClientDelegate;
+client = testBuilder.client;
+applicationEventsQueue = testBuilder.applicationEventQueue;
+applicationEventProcessor = testBuilder.applicationEventProcessor;
+commitManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
+offsetsRequestManager = testBuilder.offsetsRequestManager;
+consumerNetworkThread = 

[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15639:
--
Parent: (was: KAFKA-14246)
Issue Type: Task  (was: Sub-task)

> Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the {{doThrow}} line and it did not impact the test. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15639:
--
Description: 
The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:

 
{code:java}
@Test
public void testResetPositionsProcessFailureIsIgnored() {
doThrow(new 
NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();

ResetPositionsApplicationEvent event = new 
ResetPositionsApplicationEvent();
applicationEventsQueue.add(event);
assertDoesNotThrow(() -> consumerNetworkThread.runOnce());


verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
}
 {code}
 

[~junrao] asks:

 
{quote}Not sure if this is a useful test since 
{{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
throw an exception?
{quote}
 

I commented out the {{doThrow}} line and it did not impact the test. 

  was:
The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:

 
{code:java}
@Test
public void testResetPositionsProcessFailureIsIgnored() {
doThrow(new 
NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();

ResetPositionsApplicationEvent event = new 
ResetPositionsApplicationEvent();
applicationEventsQueue.add(event);
assertDoesNotThrow(() -> consumerNetworkThread.runOnce());


verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
}
 {code}
 

[~junrao] asks:

 
{quote}Not sure if this is a useful test since 
{{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
throw an exception?
{quote}
 

I commented out the doThrow line and it did not impact the test.


> Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the {{doThrow}} line and it did not impact the test. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15639:
--
Description: 
The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:

 
{code:java}
@Test
public void testResetPositionsProcessFailureIsIgnored() {
doThrow(new 
NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();

ResetPositionsApplicationEvent event = new 
ResetPositionsApplicationEvent();
applicationEventsQueue.add(event);
assertDoesNotThrow(() -> consumerNetworkThread.runOnce());


verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
}
 {code}
 

[~junrao] asks:

 
{quote}Not sure if this is a useful test since 
{{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
throw an exception?
{quote}
 

I commented out the doThrow line and it did not impact the test.

  was:
The testResetPositionsProcessFailureIsIgnored test looks like this:

 
{code:java}
@Test
public void testResetPositionsProcessFailureIsIgnored() {
doThrow(new 
NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();

ResetPositionsApplicationEvent event = new 
ResetPositionsApplicationEvent();
applicationEventsQueue.add(event);
assertDoesNotThrow(() -> consumerNetworkThread.runOnce());


verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
}
 {code}
 

I commented out the doThrow

 

Not sure if this is a useful test since 
{{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
throw an exception?


> Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> The {{testResetPositionsProcessFailureIsIgnored}} test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> [~junrao] asks:
>  
> {quote}Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?
> {quote}
>  
> I commented out the doThrow line and it did not impact the test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15639) Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15639:
--
Description: 
The testResetPositionsProcessFailureIsIgnored test looks like this:

 
{code:java}
@Test
public void testResetPositionsProcessFailureIsIgnored() {
doThrow(new 
NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();

ResetPositionsApplicationEvent event = new 
ResetPositionsApplicationEvent();
applicationEventsQueue.add(event);
assertDoesNotThrow(() -> consumerNetworkThread.runOnce());


verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
}
 {code}
 

I commented out the doThrow

 

Not sure if this is a useful test since 
{{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
throw an exception?

  was:
Regarding this comment in {{{}testPollResultTimer{}}}...
{code:java}
// purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
upon success|
{code}
[~junrao] asked:
{quote}Which call is returning Long.MAX_VALUE?
{quote}


> Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored
> -
>
> Key: KAFKA-15639
> URL: https://issues.apache.org/jira/browse/KAFKA-15639
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> The testResetPositionsProcessFailureIsIgnored test looks like this:
>  
> {code:java}
> @Test
> public void testResetPositionsProcessFailureIsIgnored() {
> doThrow(new 
> NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
> ResetPositionsApplicationEvent event = new 
> ResetPositionsApplicationEvent();
> applicationEventsQueue.add(event);
> assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
> 
> verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
> }
>  {code}
>  
> I commented out the doThrow
>  
> Not sure if this is a useful test since 
> {{offsetsRequestManager.resetPositionsIfNeeded()}} seems to never directly 
> throw an exception?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15639) Investigate ConsumerNetworkThread's testResetPositionsProcessFailureIsIgnored

2023-10-18 Thread Kirk True (Jira)
Kirk True created KAFKA-15639:
-

 Summary: Investigate ConsumerNetworkThread's 
testResetPositionsProcessFailureIsIgnored
 Key: KAFKA-15639
 URL: https://issues.apache.org/jira/browse/KAFKA-15639
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Kirk True
Assignee: Philip Nee


Regarding this comment in {{{}testPollResultTimer{}}}...
{code:java}
// purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
upon success|
{code}
[~junrao] asked:
{quote}Which call is returning Long.MAX_VALUE?
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-18 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1364700996


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("ClassDataAbstractionCoupling")
+public class ConsumerNetworkThreadTest {
+
+private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder;
+private Time time;
+private ConsumerMetadata metadata;
+private NetworkClientDelegate networkClient;
+private BlockingQueue applicationEventsQueue;
+private ApplicationEventProcessor applicationEventProcessor;
+private CoordinatorRequestManager coordinatorManager;
+private OffsetsRequestManager offsetsRequestManager;
+private CommitRequestManager commitManager;
+private ConsumerNetworkThread consumerNetworkThread;
+private MockClient client;
+
+@BeforeEach
+public void setup() {
+testBuilder = new 
ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder();
+time = testBuilder.time;
+metadata = testBuilder.metadata;
+networkClient = testBuilder.networkClientDelegate;
+client = testBuilder.client;
+applicationEventsQueue = testBuilder.applicationEventQueue;
+applicationEventProcessor = testBuilder.applicationEventProcessor;
+coordinatorManager = testBuilder.coordinatorRequestManager;
+commitManager = testBuilder.commitRequestManager;
+offsetsRequestManager = testBuilder.offsetsRequestManager;
+consumerNetworkThread = 

[jira] [Updated] (KAFKA-15638) Investigate ConsumerNetworkThread's testPollResultTimer

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15638:
--
Description: 
Regarding this comment in {{{}testPollResultTimer{}}}...
{code:java}
// purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
upon success|
{code}
[~junrao] asked:
{quote}Which call is returning Long.MAX_VALUE?
{quote}

  was:
Thanks for the reply. I still don't quite understand the test. Why do we 
duplicate the following code both inside and outside of {{{}setWakeupHook{}}}?
 
{code:java}
networkClientDelegate.disconnectAsync(readReplica);
networkClientDelegate.poll(time.timer(0));
{code}
 
MockClient is only woken up through 
{{{}networkClientDelegate.disconnectAsync{}}}.


> Investigate ConsumerNetworkThread's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15638) Investigate ConsumerNetworkThread's testPollResultTimer

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15638:
-

Assignee: Philip Nee  (was: Kirk True)

> Investigate ConsumerNetworkThread's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15638) Investigate ConsumerNetworkThread's testPollResultTimer

2023-10-18 Thread Kirk True (Jira)
Kirk True created KAFKA-15638:
-

 Summary: Investigate ConsumerNetworkThread's testPollResultTimer
 Key: KAFKA-15638
 URL: https://issues.apache.org/jira/browse/KAFKA-15638
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


Thanks for the reply. I still don't quite understand the test. Why do we 
duplicate the following code both inside and outside of {{{}setWakeupHook{}}}?
 
{code:java}
networkClientDelegate.disconnectAsync(readReplica);
networkClientDelegate.poll(time.timer(0));
{code}
 
MockClient is only woken up through 
{{{}networkClientDelegate.disconnectAsync{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-18 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1364699488


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

[jira] [Updated] (KAFKA-15637) Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15637:
--
Summary: Investigate FetcherTest's/FetchRequestManager's 
testFetchCompletedBeforeHandlerAdded  (was: CLONE - Investigate 
FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded)

> Investigate FetcherTest's/FetchRequestManager's 
> testFetchCompletedBeforeHandlerAdded
> 
>
> Key: KAFKA-15637
> URL: https://issues.apache.org/jira/browse/KAFKA-15637
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> Thanks for the reply. I still don't quite understand the test. Why do we 
> duplicate the following code both inside and outside of {{{}setWakeupHook{}}}?
>  
> {code:java}
> networkClientDelegate.disconnectAsync(readReplica);
> networkClientDelegate.poll(time.timer(0));
> {code}
>  
> MockClient is only woken up through 
> {{{}networkClientDelegate.disconnectAsync{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15637) CLONE - Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded

2023-10-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15637:
--
Description: 
Thanks for the reply. I still don't quite understand the test. Why do we 
duplicate the following code both inside and outside of {{{}setWakeupHook{}}}?
 
{code:java}
networkClientDelegate.disconnectAsync(readReplica);
networkClientDelegate.poll(time.timer(0));
{code}
 
MockClient is only woken up through 
{{{}networkClientDelegate.disconnectAsync{}}}.

  was:
Thanks for the reply. I still don't quite understand the test. Why do we 
duplicate the following code both inside and outside of setWakeupHook?
 
{code:java}
networkClientDelegate.disconnectAsync(readReplica);
networkClientDelegate.poll(time.timer(0));
{code}
 
MockClient is only woken up through 
{{{}networkClientDelegate.disconnectAsync{}}}.


> CLONE - Investigate FetcherTest's/FetchRequestManager's 
> testFetchCompletedBeforeHandlerAdded
> 
>
> Key: KAFKA-15637
> URL: https://issues.apache.org/jira/browse/KAFKA-15637
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> Thanks for the reply. I still don't quite understand the test. Why do we 
> duplicate the following code both inside and outside of {{{}setWakeupHook{}}}?
>  
> {code:java}
> networkClientDelegate.disconnectAsync(readReplica);
> networkClientDelegate.poll(time.timer(0));
> {code}
>  
> MockClient is only woken up through 
> {{{}networkClientDelegate.disconnectAsync{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15637) CLONE - Investigate FetcherTest's/FetchRequestManager's testFetchCompletedBeforeHandlerAdded

2023-10-18 Thread Kirk True (Jira)
Kirk True created KAFKA-15637:
-

 Summary: CLONE - Investigate FetcherTest's/FetchRequestManager's 
testFetchCompletedBeforeHandlerAdded
 Key: KAFKA-15637
 URL: https://issues.apache.org/jira/browse/KAFKA-15637
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


{{expectedBytes}} is calculated as total, instead of avg. Is this correct?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >