Re: [PR] MINOR: Fix toString method of IsolationLevel [kafka]

2024-01-03 Thread via GitHub


ashwinpankaj commented on code in PR #14782:
URL: https://github.com/apache/kafka/pull/14782#discussion_r1441389074


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##
@@ -234,7 +233,7 @@ public void start() {
 throw new ConnectException(
 "Must provide a TopicAdmin to KafkaBasedLog when consumer 
is configured with "
 + ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to 
"
-+ 
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
++ IsolationLevel.READ_COMMITTED

Review Comment:
   ```suggestion
   + IsolationLevel.READ_COMMITTED.toString()
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##
@@ -248,7 +247,7 @@ public void start() {
 + "support for source connectors, or upgrade to a 
newer Kafka broker version.";
 } else {
 message = "When " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + 
"is set to "
-+ 
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
++ IsolationLevel.READ_COMMITTED

Review Comment:
   ```suggestion
   + IsolationLevel.READ_COMMITTED.toString()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-01-03 Thread Kohei Nozaki (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802401#comment-17802401
 ] 

Kohei Nozaki commented on KAFKA-16055:
--

Pull request: [https://github.com/apache/kafka/pull/15121]

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16055: Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders [kafka]

2024-01-03 Thread via GitHub


nuzayats opened a new pull request, #15121:
URL: https://github.com/apache/kafka/pull/15121

   This PR replaces a HashMap by a ConcurrentHashMap so that the local state 
store queries can be made from multiple threads. This is based on a discussion 
in the kafka-users mailing list. See this for additional context: 
https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-03 Thread via GitHub


ashwinpankaj commented on code in PR #15101:
URL: https://github.com/apache/kafka/pull/15101#discussion_r1441364275


##
core/src/test/java/kafka/test/junit/LeakTestingExtension.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.utils.TestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
+import org.junit.jupiter.api.extension.ExtensionContext.Store;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import scala.Tuple2;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LeakTestingExtension implements BeforeEachCallback, 
AfterEachCallback {
+private static final Set EXPECTED_THREAD_NAMES = new HashSet<>(
+Arrays.asList("junit-", "JMX", 
"feature-zk-node-event-process-thread", "ForkJoinPool", "executor-",
+"metrics-meter-tick-thread", "scala-", "pool-")
+);
+private static final String THREADS_KEY = "threads";
+
+@Override
+public void beforeEach(ExtensionContext context) {
+getStore(context).put(THREADS_KEY, 
Thread.getAllStackTraces().keySet());
+}
+
+@Override
+@SuppressWarnings("unchecked")
+public void afterEach(ExtensionContext context) {
+Set initialThreads = getStore(context).remove(THREADS_KEY, 
Set.class);
+Tuple2, Object> unexpectedThreads = 
TestUtils.computeUntilTrue(
+() -> unexpectedThreads(initialThreads),
+DEFAULT_MAX_WAIT_MS,
+100L,
+Set::isEmpty
+);
+
+assertTrue(unexpectedThreads._1.isEmpty(), "Found unexpected threads 
after executing test: " +
+
unexpectedThreads._1.stream().map(Objects::toString).collect(Collectors.joining(",
 ")));
+}
+
+private Set unexpectedThreads(Set initialThreads) {
+Set finalThreads = Thread.getAllStackTraces().keySet();
+
+if (initialThreads.size() != finalThreads.size()) {

Review Comment:
   intialThreads and finalThreads could have the same size but contain 
different threads. I think we should match the names always.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-03 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1876338999

   > Thanks @iit2009060 for the PR.
   > 
   > Let us say there are two segments in remote storage and subsequents 
segments in local storage. remote-seg-10[10, 20], remote-seg-21[21, 30] : 
offsets 25 to 30 are compacted. local-seg-31[31, 40]
   > 
   > When a fetch request comes for offsets with in [25, 30] then it should 
move to the local segment as those offsets might have been compacted earlier. 
Did you also cover this scenario in this PR?
   
   @satishd  I have not tested this case explicitly. 
   In this case  RemoteLogManager would be returning firstBatch as null and the 
controller(The class which is invoking RemoteLogManager read)  should take care 
of reading the next segment locally. Let me reproduce this issue locally and 
update the behaviour. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-03 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802024#comment-17802024
 ] 

Satish Duggana edited comment on KAFKA-16073 at 1/4/24 3:50 AM:


That was a good catch [~hzh0425@apache] !

I think it is better to avoid holding a lock for local-log-start-offset updates 
or fetches, that can introduce other side effects.

We discussed one possible solution is to address it by updating 
local-log-start-offset before the segments are removed from inmemory and 
scheduled for deletion but we need to think through the end to end scenarios. 
cc [~Kamal C]


was (Author: satish.duggana):
We discussed one possible solution is to address it by updating 
local-log-start-offset before the segments are removed from inmemory and 
scheduled for deletion but we need to think through the end to end scenarios. 
cc [~Kamal C]

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fixed typos in docker readme documentation [kafka]

2024-01-03 Thread via GitHub


showuon merged PR #15120:
URL: https://github.com/apache/kafka/pull/15120


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16074: close leaking threads in replica manager tests [kafka]

2024-01-03 Thread via GitHub


showuon commented on PR #15077:
URL: https://github.com/apache/kafka/pull/15077#issuecomment-1876239029

   @satishd , please take another look when available. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]

2024-01-03 Thread via GitHub


jolshan merged PR #14599:
URL: https://github.com/apache/kafka/pull/14599


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]

2024-01-03 Thread via GitHub


Phuc-Hong-Tran commented on PR #15020:
URL: https://github.com/apache/kafka/pull/15020#issuecomment-1876148310

   I understand, will get another PR out ASAP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-03 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1441077572


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dirFile + 
Paths.get("/../siblingdir/siblingdirFile"));

Review Comment:
   I have also added tests for ConfigProviderUtils, so I guess we could remove 
the duplicated traversal tests but wonder if we should still keep the allowed 
paths tests in the these classes as they test the full flow. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15467) Kafka broker returns offset out of range for topic/partitions on restart from unclean shutdown

2024-01-03 Thread Steve Jacobs (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Jacobs updated KAFKA-15467:
-
Component/s: log

> Kafka broker returns offset out of range for topic/partitions on restart from 
> unclean shutdown
> --
>
> Key: KAFKA-15467
> URL: https://issues.apache.org/jira/browse/KAFKA-15467
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 3.5.1
> Environment: Apache Kafka 3.5.1 with Strimzi on kubernetes.
>Reporter: Steve Jacobs
>Priority: Major
>
> So this started with me thinking this was a mirrormaker2 issue because here 
> are the symptoms I am seeing:
> I'm encountering an odd issue with mirrormaker2 with our remote replication 
> setup to high latency remote sites (satellite).
> Every few days we get several topics completely re-replicated, this appears 
> to happen after a network connectivity outage. It doesn't matter if it's a 
> long outage (hours) or a short one (minutes). And it only seems to affect a 
> few topics.
> I was finally able to track down some logs showing the issue. This was after 
> an hour-ish long outage where connectivity went down. There were lots of logs 
> about connection timeouts, etc. Here is the relevant part when the connection 
> came back up:
> {code:java}
> 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> [AdminClient 
> clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
>  Disconnecting from node 0 due to socket connection setup timeout. The 
> timeout value is 63245 ms. (org.apache.kafka.clients.NetworkClient) 
> [kafka-admin-client-thread | 
> mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
> 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> [AdminClient 
> clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
>  Metadata update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager) 
> [kafka-admin-client-thread | 
> mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
> 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Disconnecting from node 0 due to socket connection setup 
> timeout. The timeout value is 52624 ms. 
> (org.apache.kafka.clients.NetworkClient) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Error sending fetch request (sessionId=460667411, 
> epoch=INITIAL) to node 0: (org.apache.kafka.clients.FetchSessionHandler) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> 2023-09-08 16:52:47,336 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> refreshing topics took 67359 ms (org.apache.kafka.connect.mirror.Scheduler) 
> [Scheduler for MirrorSourceConnector: 
> scbi->gcp|scbi->gcp.MirrorSourceConnector-refreshing topics]
> 2023-09-08 16:52:48,413 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Fetch position FetchPosition{offset=4918131, 
> offsetEpoch=Optional[0], 
> currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094
>  (id: 0 rack: null)], epoch=0}} is out of range for partition 
> reading.sensor.hfp01sc-0, resetting offset 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> (Repeats for 11 more topics)
> 2023-09-08 16:52:48,479 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Resetting offset for partition reading.sensor.hfp01sc-0 to 
> position FetchPosition{offset=3444977, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094
>  (id: 0 rack: null)], epoch=0}}. 
> (org.apache.kafka.clients.consumer.internals.SubscriptionState) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> (Repeats for 11 more topics) {code}
> The consumer reports that offset 4918131 is out of range for this 
> topic/partition, but that offset still exists on the remote cluster. I can go 
> pull it up with a consumer right now. The earliest offset in that topic that 
> still exists is 3444977 as of yesterday. We have 30 day retention configured 
> so pulling in 30 days of duplicate data is very not good. It 

Re: [PR] KAFKA-16067 Refactoring ConsumerGroupListing + add test [kafka]

2024-01-03 Thread via GitHub


hachikuji commented on code in PR #15092:
URL: https://github.com/apache/kafka/pull/15092#discussion_r1441070583


##
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##
@@ -104,8 +104,8 @@ public boolean equals(Object obj) {
 return false;
 if (isSimpleConsumerGroup != other.isSimpleConsumerGroup)
 return false;
-if (state == null) {
-if (other.state != null)
+if (!state.isPresent()) {

Review Comment:
   I think the expectation in the previous code was that `Optional.equals` 
would handle this case. So can we just get rid of the null check? Note that 
this depends on the `requireNonNull` in the constructor. Perhaps we should have 
a unit test to protect it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15467) Kafka broker returns offset out of range for topic/partitions on restart from unclean shutdown

2024-01-03 Thread Steve Jacobs (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Jacobs updated KAFKA-15467:
-
Component/s: core

> Kafka broker returns offset out of range for topic/partitions on restart from 
> unclean shutdown
> --
>
> Key: KAFKA-15467
> URL: https://issues.apache.org/jira/browse/KAFKA-15467
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.1
> Environment: Apache Kafka 3.5.1 with Strimzi on kubernetes.
>Reporter: Steve Jacobs
>Priority: Major
>
> So this started with me thinking this was a mirrormaker2 issue because here 
> are the symptoms I am seeing:
> I'm encountering an odd issue with mirrormaker2 with our remote replication 
> setup to high latency remote sites (satellite).
> Every few days we get several topics completely re-replicated, this appears 
> to happen after a network connectivity outage. It doesn't matter if it's a 
> long outage (hours) or a short one (minutes). And it only seems to affect a 
> few topics.
> I was finally able to track down some logs showing the issue. This was after 
> an hour-ish long outage where connectivity went down. There were lots of logs 
> about connection timeouts, etc. Here is the relevant part when the connection 
> came back up:
> {code:java}
> 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> [AdminClient 
> clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
>  Disconnecting from node 0 due to socket connection setup timeout. The 
> timeout value is 63245 ms. (org.apache.kafka.clients.NetworkClient) 
> [kafka-admin-client-thread | 
> mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
> 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> [AdminClient 
> clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
>  Metadata update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager) 
> [kafka-admin-client-thread | 
> mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
> 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Disconnecting from node 0 due to socket connection setup 
> timeout. The timeout value is 52624 ms. 
> (org.apache.kafka.clients.NetworkClient) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Error sending fetch request (sessionId=460667411, 
> epoch=INITIAL) to node 0: (org.apache.kafka.clients.FetchSessionHandler) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> 2023-09-08 16:52:47,336 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> refreshing topics took 67359 ms (org.apache.kafka.connect.mirror.Scheduler) 
> [Scheduler for MirrorSourceConnector: 
> scbi->gcp|scbi->gcp.MirrorSourceConnector-refreshing topics]
> 2023-09-08 16:52:48,413 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Fetch position FetchPosition{offset=4918131, 
> offsetEpoch=Optional[0], 
> currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094
>  (id: 0 rack: null)], epoch=0}} is out of range for partition 
> reading.sensor.hfp01sc-0, resetting offset 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> (Repeats for 11 more topics)
> 2023-09-08 16:52:48,479 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Resetting offset for partition reading.sensor.hfp01sc-0 to 
> position FetchPosition{offset=3444977, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094
>  (id: 0 rack: null)], epoch=0}}. 
> (org.apache.kafka.clients.consumer.internals.SubscriptionState) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> (Repeats for 11 more topics) {code}
> The consumer reports that offset 4918131 is out of range for this 
> topic/partition, but that offset still exists on the remote cluster. I can go 
> pull it up with a consumer right now. The earliest offset in that topic that 
> still exists is 3444977 as of yesterday. We have 30 day retention configured 
> so pulling in 30 days of duplicate data is very not good. It almost 

Re: [PR] KAFKA-16067 Refactoring ConsumerGroupListing + add test [kafka]

2024-01-03 Thread via GitHub


hachikuji commented on code in PR #15092:
URL: https://github.com/apache/kafka/pull/15092#discussion_r1441070183


##
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListingTest.java:
##
@@ -0,0 +1,27 @@
+package org.apache.kafka.clients.admin;

Review Comment:
   We are missing the license. Also, we need to move this into 
`clients/src/test/java`.



##
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##
@@ -104,8 +104,8 @@ public boolean equals(Object obj) {
 return false;
 if (isSimpleConsumerGroup != other.isSimpleConsumerGroup)
 return false;
-if (state == null) {
-if (other.state != null)
+if (!state.isPresent()) {

Review Comment:
   I think the expectation in the previous code was that `Optional.equals` 
would handle this case. So can we just get rid of the null check? This depends 
on the `requireNonNull` in the constructor. Perhaps we should have a unit test 
to protect it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2024-01-03 Thread Steve Jacobs (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Jacobs closed KAFKA-10133.


> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.7.0
>
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-03 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1441068240


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig,
* @param responseCallback  callback for sending the response
* @param delayedProduceLocklock for the delayed actions
* @param recordValidationStatsCallback callback for updating stats on 
record conversions
-   * @param requestLocal  container for the stateful instances 
scoped to this request
-   * @param transactionalId   transactional ID if the request is 
from a producer and the producer is transactional
+   * @param requestLocal  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *  thread calling this method
* @param actionQueue   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuardsthe mapping from topic partition to 
verification guards if transaction verification is used
+   * @param preAppendErrors   the mapping from topic partition to 
LogAppendResult for errors that occurred before appending

Review Comment:
   Oh I see what you are saying -- basically the appendCallback should be 
defined so the verification errors are joined whenever the callback is defined 
(ideally after we get the verification errors).
   
   So the callback for verification would include the definition of the append 
callback. I think this could work.
   
   Time to change all the mock code  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-03 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1441066328


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig,
* @param responseCallback  callback for sending the response
* @param delayedProduceLocklock for the delayed actions
* @param recordValidationStatsCallback callback for updating stats on 
record conversions
-   * @param requestLocal  container for the stateful instances 
scoped to this request
-   * @param transactionalId   transactional ID if the request is 
from a producer and the producer is transactional
+   * @param requestLocal  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *  thread calling this method
* @param actionQueue   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuardsthe mapping from topic partition to 
verification guards if transaction verification is used
+   * @param preAppendErrors   the mapping from topic partition to 
LogAppendResult for errors that occurred before appending

Review Comment:
   I was considering a write where some partitions were verified and some were 
not. We would allow the ones that succeeded to still write and not the ones 
that failed verification.
   
   Not sure if we support multi-partition writes in the produce api, but it 
appears we do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-03 Thread via GitHub


hachikuji commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1441051073


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig,
* @param responseCallback  callback for sending the response
* @param delayedProduceLocklock for the delayed actions
* @param recordValidationStatsCallback callback for updating stats on 
record conversions
-   * @param requestLocal  container for the stateful instances 
scoped to this request
-   * @param transactionalId   transactional ID if the request is 
from a producer and the producer is transactional
+   * @param requestLocal  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *  thread calling this method
* @param actionQueue   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuardsthe mapping from topic partition to 
verification guards if transaction verification is used
+   * @param preAppendErrors   the mapping from topic partition to 
LogAppendResult for errors that occurred before appending

Review Comment:
   This seems a little strange. If the partitions have already failed, then why 
do we need to pass them through `appendRecords`? I would expect instead that 
the caller would just join the pre-append failures with the append failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-03 Thread via GitHub


tinaselenge commented on PR #14995:
URL: https://github.com/apache/kafka/pull/14995#issuecomment-1876030053

   > Thanks for the PR. I made a second quick pass and left a few more comments.
   
   @mimaison Thank you very much for reviewing the PR again. I think I 
addressed your comments except the one I had a question on. Please let me know 
if I missed anything. Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-03 Thread via GitHub


tinaselenge commented on PR #14995:
URL: https://github.com/apache/kafka/pull/14995#issuecomment-1876028036

   > I came up with another path traversal, but this one is less severe.
   > 
   > If you try to get `/arbitrary/path/../..//dir/dirFile` you can 
perform "exists" and "isDirectory" checks for `/arbitrary/path` depending on 
the error returned.
   > 
   > (edit): This attack isn't reproducible in the tests, because they mock out 
`reader(Path)` 
   > 
   > This would allow an attacker to map out the whole filesystem and 
permissions of the current process. This could inform another attack, such as: 
finding vulnerable software, finding user accounts, finding active processes, 
etc.
   > 
   > This attack works because the normalized form is used in pathIsAllowed(), 
but the non-normalized form is used in the actual read. I think we need to make 
it so that the normalized path is used instead of the given path.
   > 
   > I noticed this caveat in the documentation for Path.normalize:
   > 
   > > This method does not access the file system; the path may not locate a 
file that exists. Eliminating ".." and a preceding name from a path may result 
in the path that locates a different file than the original path. This can 
arise when the preceding name is a symbolic link.
   > 
   > I think this would cause a backwards-incompatible change if we were to 
apply it in the default case, so we should only use the normalized form when 
`allowed.paths` is specified.
   > 
   > I tried and failed to come up with a arbitrary file-read attack using the 
symlink behavior. If the symlink is within the allowed-paths (and possibly lets 
the user escape the allowed-paths) it should probably work normally. If the 
symlink is outside of the allowed paths, we can exploit it with the same 
prefixing attack from above, but the allowed part of the path prevents actually 
reading arbitrary files. Regardless, I think that using the normalized form for 
file accesses prevents accessing external symlinks completely.
   > 
   > Thanks so much for working on this feature!
   
   
   Thank you very much @gharris1727 for the detailed comment on this issue. 
These are really good points and I agree, I hadn't put enough thoughts into 
this. 
   
   I took the suggestions and updated it to use the normalisedPath in the 
actual read when `allowed.paths` is specified. Please let me know what you 
think. 
   
   I also tried addressing your other comments. Thank you for reviewing the PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-03 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1440996476


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dirFile + 
Paths.get("/../siblingdir/siblingdirFile"));

Review Comment:
   good point! I fixed the path logic, however I'm not sure about deduplicating 
the test classes as they are set up slightly different, one working with 
directory and the one working with files. 
   
   I did however deduplicate the implementation classes by adding 
ConfigProviderUtils class with the common methods. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-03 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1440994447


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";
+private List allowedPaths;
+
 public void configure(Map configs) {
+if (configs.containsKey(ALLOWED_PATHS_CONFIG)) {
+String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG);
+
+if (configValue != null && !configValue.isEmpty()) {

Review Comment:
   If the user configure the config with null or empty value, should that be 
considered as a bad value or should that be considered same as not setting it, 
therefore allowing access to all paths? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fixed typos in docker readme documentation [kafka]

2024-01-03 Thread via GitHub


kumarlokesh opened a new pull request, #15120:
URL: https://github.com/apache/kafka/pull/15120

   Fixed minor typos in `docker/README` documentation.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16078 IBP defaults to latest production MetadataVersion [kafka]

2024-01-03 Thread via GitHub


mumrah commented on PR #15118:
URL: https://github.com/apache/kafka/pull/15118#issuecomment-1875988796

   All production usages now call `latestProduction` and test usages (including 
benchmark classes) call `latestTesting. One exception is is 
`QuorumFeatures#defaultFeatureMap(boolean enableUnstable)` which calls either 
depending on its argument.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-03 Thread via GitHub


gharris1727 commented on code in PR #15101:
URL: https://github.com/apache/kafka/pull/15101#discussion_r1440917179


##
core/src/test/java/kafka/test/junit/LeakTestingExtension.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.utils.TestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
+import org.junit.jupiter.api.extension.ExtensionContext.Store;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import scala.Tuple2;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LeakTestingExtension implements BeforeEachCallback, 
AfterEachCallback {
+private static final Set EXPECTED_THREAD_NAMES = new HashSet<>(

Review Comment:
   Oh I didn't notice that this is a set of threads that are allowed to leak, 
where the original TestUtils.verifyNoUnexpectedThreads was a set of threads 
which weren't allowed to leak. I assumed the mechanism was copy-pasted.
   
I don't know if this set needs to exist, especially now that the assertion 
is stateful. Perhaps we can empty this set completely to see what the effect 
is, and then add in what is necessary. I did see some flakiness with an early 
prototype of #14783 which sometimes detected the gradle runner opening sockets. 
Perhaps there are similar things which create background threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-03 Thread via GitHub


divijvaidya commented on code in PR #15101:
URL: https://github.com/apache/kafka/pull/15101#discussion_r1440893773


##
core/src/test/java/kafka/test/junit/LeakTestingExtension.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.utils.TestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
+import org.junit.jupiter.api.extension.ExtensionContext.Store;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import scala.Tuple2;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LeakTestingExtension implements BeforeEachCallback, 
AfterEachCallback {
+private static final Set EXPECTED_THREAD_NAMES = new HashSet<>(

Review Comment:
   Hy @gharris1727 @wernerdv 
   Can you please help me understand why it is expected to have kafka-scheduler 
or ExpirationReaper or ReplicaFetcherThread at the end or beginning of test? 
Isn't presence of these threads an indication of a thread leak? (for example, 
expiration reaper threads may mean that we don't close controllerAPIs 
correctly) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16078 IBP defaults to latest production MetadataVersion [kafka]

2024-01-03 Thread via GitHub


cmccabe commented on PR #15118:
URL: https://github.com/apache/kafka/pull/15118#issuecomment-1875906638

   Alternately, if we wanted to make this literally impossible to invoke from a 
non-test context, we could move `MetadataVersion.latest` into 
`MetadataVersionTest.latest`.
   
   The downside is that that would require all the other test modules to depend 
on `:server-common:test` (although a lot of them already do?) That might extend 
the build time? Not sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16078 IBP defaults to latest production MetadataVersion [kafka]

2024-01-03 Thread via GitHub


cmccabe commented on PR #15118:
URL: https://github.com/apache/kafka/pull/15118#issuecomment-1875904378

   I agree with @ijuma that just having a boolean isn't very clear.
   
   How about renaming `MetadataVersion.latest()` to 
`MetadataVersion.latestTesting()`, and then creating a separate function 
`MetadataVersion.latestProduction()`?
   
   That seems like the clearest we're going to get with this. Devs won't want 
to invoke something with "testing" in the name in  a non-test context.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16078 IBP defaults to latest production MetadataVersion [kafka]

2024-01-03 Thread via GitHub


ijuma commented on code in PR #15118:
URL: https://github.com/apache/kafka/pull/15118#discussion_r1440881725


##
core/src/main/java/kafka/server/builders/LogManagerBuilder.java:
##
@@ -48,7 +48,7 @@ public class LogManagerBuilder {
 private int maxTransactionTimeoutMs = 15 * 60 * 1000;
 private ProducerStateManagerConfig producerStateManagerConfig = new 
ProducerStateManagerConfig(6, false);
 private int producerIdExpirationCheckIntervalMs = 60;
-private MetadataVersion interBrokerProtocolVersion = 
MetadataVersion.latest();
+private MetadataVersion interBrokerProtocolVersion = 
MetadataVersion.latest(true);

Review Comment:
   An enum instead of boolean would make this a lot more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Move Raft io thread implementation to Java [kafka]

2024-01-03 Thread via GitHub


hachikuji opened a new pull request, #15119:
URL: https://github.com/apache/kafka/pull/15119

   This patch moves the `RaftIOThread` implementation into Java. I changed the 
name to `KafkaRaftClientDriver` since the main thing it does is drive the calls 
to `poll()`. There shouldn't be any changes to the logic.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16078 IBP defaults to latest production MetadataVersion [kafka]

2024-01-03 Thread via GitHub


mumrah opened a new pull request, #15118:
URL: https://github.com/apache/kafka/pull/15118

   This patch introduces a boolean flag to MetadataVersion#latest which 
controls whether or not "unstable" MetadataVersions can be returned. For 
testing purposes, we want to be able to automatically pick up the latest 
version regardless of stability. For production, we should only be using 
LATEST_PRODUCTION.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-03 Thread via GitHub


philipnee commented on PR #15000:
URL: https://github.com/apache/kafka/pull/15000#issuecomment-1875876548

   hi @lucasbru - i think we might need to loosen the thread access restriction 
for the interceptor because i think we also need to trigger interceptors during 
autocommit.  There are 2 ways to achieve this:
   1. pass an event to the main thread to execute the interceptor on poll: the 
interceptor might never get triggered
   2. loosen the thread access restriction - I think we might need to do that 
to guarantee the interceptors can always be invoked
   
   #2 is a bit annoying because it against the contract as you mentioned in one 
of the comment.  What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup race on unassigned task directories [kafka]

2024-01-03 Thread via GitHub


sanepal commented on code in PR #15088:
URL: https://github.com/apache/kafka/pull/15088#discussion_r1440860832


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1229,10 +1229,21 @@ private void tryToLockAllNonEmptyTaskDirectories() {
 final String namedTopology = taskDir.namedTopology();
 try {
 final TaskId id = parseTaskDirectoryName(dir.getName(), 
namedTopology);
-if (stateDirectory.lock(id)) {
-lockedTaskDirectories.add(id);
-if (!allTasks.containsKey(id)) {
-log.debug("Temporarily locked unassigned task {} for 
the upcoming rebalance", id);
+boolean lockedEmptyDirectory = false;
+try {
+if (stateDirectory.lock(id)) {
+if (stateDirectory.directoryForTaskIsEmpty(id)) {
+lockedEmptyDirectory = true;

Review Comment:
   Thank you for the feedback! I updated to simplify the control flow and add a 
comment and log around the behavior. Please let me know how that looks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]

2024-01-03 Thread via GitHub


philipnee commented on PR #15020:
URL: https://github.com/apache/kafka/pull/15020#issuecomment-1875867893

   hi @Phuc-Hong-Tran - yes, the abstractFetch implementation is based on the 
LegacyKafkaConsumer and therefore requires connection probing.  We don't need 
that in the AsyncKafkaConsumer as it is being done right before sending out the 
requests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16078) InterBrokerProtocolVersion defaults to non-production MetadataVersion

2024-01-03 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802308#comment-17802308
 ] 

Justine Olshan commented on KAFKA-16078:


We should really get to writing the KIP for the "non-production" vs 
"production" MVs.

> InterBrokerProtocolVersion defaults to non-production MetadataVersion
> -
>
> Key: KAFKA-16078
> URL: https://issues.apache.org/jira/browse/KAFKA-16078
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16078) InterBrokerProtocolVersion defaults to non-production MetadataVersion

2024-01-03 Thread David Arthur (Jira)
David Arthur created KAFKA-16078:


 Summary: InterBrokerProtocolVersion defaults to non-production 
MetadataVersion
 Key: KAFKA-16078
 URL: https://issues.apache.org/jira/browse/KAFKA-16078
 Project: Kafka
  Issue Type: Bug
Reporter: David Arthur
Assignee: David Arthur






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16077: Streams fails to close task after restoration [kafka]

2024-01-03 Thread via GitHub


lucasbru opened a new pull request, #15117:
URL: https://github.com/apache/kafka/pull/15117

   Streams fails to close task after restoration when input partitions are 
updated in a new assignment happening at the same time.
   
   There is a race condition in the state updater that can cause the following:
   
1. We have an active task in the state updater
2. We get fenced. We recreate the producer, transactions now uninitialized. 
We ask the state updater to give back the task, add a pending action to close 
the task clean once it’s handed back
3. We get a new assignment with updated input partitions. The task is still 
owned by the state updater, so we ask the state updater again to hand it back 
and add a pending action to update its input partition
4. The task is handed back by the state updater. We update its input 
partitions but forget to close it clean (pending action was overwritten)
5. Now the task is in an initialized state, but the underlying producer 
does not have transactions initialized
   
   This can cause an IllegalStateException: `Invalid transition attempted from 
state UNINITIALIZED to state IN_TRANSACTION` when running in EOSv2.
   
   To fix this, we introduce a new pending action 
CloseReviveAndUpdateInputPartitions that is added when we handle a new 
assignment with updated input partitions, but we still need to close the task 
before reopening it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16077) Streams fails to close task after restoration when input partitions are updated

2024-01-03 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16077:
---
Summary: Streams fails to close task after restoration when input 
partitions are updated  (was: State updater fails to close task when input 
partitions are updated)

> Streams fails to close task after restoration when input partitions are 
> updated
> ---
>
> Key: KAFKA-16077
> URL: https://issues.apache.org/jira/browse/KAFKA-16077
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Priority: Critical
>
> There is a race condition in the state updater that can cause the following:
>  # We have an active task in the state updater
>  # We get fenced. We recreate the producer, transactions now uninitialized. 
> We ask the state updater to give back the task, add a pending action to close 
> the task clean once it’s handed back
>  # We get a new assignment with updated input partitions. The task is still 
> owned by the state updater, so we ask the state updater again to hand it back 
> and add a pending action to update its input partition
>  # The task is handed back by the state updater. We update its input 
> partitions but forget to close it clean (pending action was overwritten)
>  # Now the task is in an initialized state, but the underlying producer does 
> not have transactions initialized
> This can lead to an exception like this:
> {code:java}
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException:
>  Exception caught in process. taskId=1_0, 
> processor=KSTREAM-SOURCE-05, topic=node-name-repartition, 
> partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: 
> TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: 
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> 

[jira] [Created] (KAFKA-16077) State updater fails to close task when input partitions are updated

2024-01-03 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-16077:
--

 Summary: State updater fails to close task when input partitions 
are updated
 Key: KAFKA-16077
 URL: https://issues.apache.org/jira/browse/KAFKA-16077
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Lucas Brutschy


There is a race condition in the state updater that can cause the following:
 # We have an active task in the state updater
 # We get fenced. We recreate the producer, transactions now uninitialized. We 
ask the state updater to give back the task, add a pending action to close the 
task clean once it’s handed back
 # We get a new assignment with updated input partitions. The task is still 
owned by the state updater, so we ask the state updater again to hand it back 
and add a pending action to update its input partition
 # The task is handed back by the state updater. We update its input partitions 
but forget to close it clean (pending action was overwritten)
 # Now the task is in an initialized state, but the underlying producer does 
not have transactions initialized

This can lead to an exception like this:
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException:
 Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-05, 
topic=node-name-repartition, partition=0, offset=618798, 
stacktrace=java.lang.IllegalStateException: TransactionalId 
stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: Invalid transition 
attempted from state UNINITIALIZED to state IN_TRANSACTION
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 

[jira] [Updated] (KAFKA-16077) State updater fails to close task when input partitions are updated

2024-01-03 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16077:
---
Description: 
There is a race condition in the state updater that can cause the following:
 # We have an active task in the state updater
 # We get fenced. We recreate the producer, transactions now uninitialized. We 
ask the state updater to give back the task, add a pending action to close the 
task clean once it’s handed back
 # We get a new assignment with updated input partitions. The task is still 
owned by the state updater, so we ask the state updater again to hand it back 
and add a pending action to update its input partition
 # The task is handed back by the state updater. We update its input partitions 
but forget to close it clean (pending action was overwritten)
 # Now the task is in an initialized state, but the underlying producer does 
not have transactions initialized

This can lead to an exception like this:
{code:java}
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException:
 Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-05, 
topic=node-name-repartition, partition=0, offset=618798, 
stacktrace=java.lang.IllegalStateException: TransactionalId 
stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: Invalid transition 
attempted from state UNINITIALIZED to state IN_TRANSACTION
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:847)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 

Re: [PR] KAFKA-16070: Extract the setReadOnly method into Headers [kafka]

2024-01-03 Thread via GitHub


hachikuji commented on PR #15113:
URL: https://github.com/apache/kafka/pull/15113#issuecomment-1875758227

   Closing as a duplicate of #15097 . Please reopen if mistaken.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16070: Extract the setReadOnly method into Headers [kafka]

2024-01-03 Thread via GitHub


hachikuji closed pull request #15113: KAFKA-16070: Extract the setReadOnly 
method into Headers
URL: https://github.com/apache/kafka/pull/15113


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16070: move setReadOnly to Headers [kafka]

2024-01-03 Thread via GitHub


hachikuji commented on code in PR #15097:
URL: https://github.com/apache/kafka/pull/15097#discussion_r1440744523


##
clients/src/main/java/org/apache/kafka/common/header/Headers.java:
##
@@ -69,4 +69,10 @@ public interface Headers extends Iterable {
  */
 Header[] toArray();
 
+/**
+ * Set Header to readonly.
+ */
+default void setReadOnly() {

Review Comment:
   Hmm, I don't think `setReadOnly` was ever intended to be public. The class 
`RecordHeaders` is in `internals`. We just use this method as a way to prevent 
modification once the headers have been passed to the producer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add reviewers GitHub action [kafka]

2024-01-03 Thread via GitHub


divijvaidya commented on code in PR #15115:
URL: https://github.com/apache/kafka/pull/15115#discussion_r1440702318


##
.github/workflows/pr_reviews.yml:
##
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Adding Reviewers
+
+on:
+  pull_request_review:
+types: [submitted]
+
+jobs:
+  add_reviewers:
+runs-on: ubuntu-latest
+steps:
+- uses: actions/checkout@v3
+- name: Add Reviewers
+  run: |
+user_json=$(gh api users/${{ github.event.review.user.login }})

Review Comment:
   we should probably explicitly specify the `accept` in the header to `-H 
"Accept: application/vnd.github+json" \` as per 
https://docs.github.com/en/rest/users/users?apiVersion=2022-11-28
   
   Also maybe the version to `-H "X-GitHub-Api-Version: 2022-11-28" \`



##
.github/workflows/pr_reviews.yml:
##
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Adding Reviewers
+
+on:
+  pull_request_review:
+types: [submitted]
+
+jobs:
+  add_reviewers:
+runs-on: ubuntu-latest
+steps:
+- uses: actions/checkout@v3
+- name: Add Reviewers
+  run: |
+user_json=$(gh api users/${{ github.event.review.user.login }})
+user_name=$(echo "$user_json" | jq -r '.name')
+user_email=$(echo "$user_json" | jq -r '.email')
+if [[ "${{ github.event.pull_request.body }}" =~ ^.*Reviewers:\ 
.*${user_name}.*$ ]]; then

Review Comment:
   nit
   
   You can extract out pull_request.body into a variable since it's used 
multiple times. The code becomes
   
   ```
   run: |
 user_json=$(gh api users/${{ github.event.review.user.login }})
 user_name=$(echo "$user_json" | jq -r '.name')
 user_email=$(echo "$user_json" | jq -r '.email')
   
 pr_body="${{ github.event.pull_request.body }}"
 if [[ "$pr_body" =~ ^.*Reviewers:\ .*${user_name}.*$ ]]; then
   echo "Reviewer already added: ${user_name} <${user_email}>"
 else
   if [[ "$pr_body" =~ ^.*Reviewers:\ .*$ ]]; then
 pr_body+=", ${user_name} <${user_email}>"
   else
 pr_body+="Reviewers: ${user_name} <${user_email}>"
   fi
   
   gh pr edit ${{ github.event.pull_request.number }} --body "$pr_body"
   echo "Added reviewer: ${user_name} <${user_email}>"
 fi
 ```



##
.github/workflows/pr_reviews.yml:
##
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Adding Reviewers
+
+on:
+  pull_request_review:
+types: [submitted]
+
+jobs:
+  add_reviewers:
+runs-on: ubuntu-latest
+steps:
+- uses: actions/checkout@v3
+- name: Add Reviewers
+  run: |
+user_json=$(gh api users/${{ github.event.review.user.login }})
+

Re: [PR] MINOR: Bump year to 2024 in NOTICE file [kafka]

2024-01-03 Thread via GitHub


stanislavkozlovski commented on PR #15111:
URL: https://github.com/apache/kafka/pull/15111#issuecomment-1875709048

   cherry-picked to 3.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Bump year to 2024 in NOTICE file [kafka]

2024-01-03 Thread via GitHub


stanislavkozlovski merged PR #15111:
URL: https://github.com/apache/kafka/pull/15111


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add reviewers GitHub action [kafka]

2024-01-03 Thread via GitHub


mumrah commented on PR #15115:
URL: https://github.com/apache/kafka/pull/15115#issuecomment-1875698491

   Thanks @viktorsomogyi, this is a nice little automation.
   
   Does this script edit the commit or just the PR body?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15742) KRaft support in GroupCoordinatorIntegrationTest

2024-01-03 Thread Dmitry Werner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Werner updated KAFKA-15742:
--
Fix Version/s: 3.8.0

> KRaft support in GroupCoordinatorIntegrationTest
> 
>
> Key: KAFKA-15742
> URL: https://issues.apache.org/jira/browse/KAFKA-15742
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Dmitry Werner
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
> Fix For: 3.8.0
>
>
> The following tests in GroupCoordinatorIntegrationTest in 
> core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
>  need to be updated to support KRaft
> 41 : def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = 
> {
> Scanned 63 lines. Found 0 KRaft tests out of 1 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15742: KRaft support in GroupCoordinatorIntegrationTest [kafka]

2024-01-03 Thread via GitHub


jolshan merged PR #15086:
URL: https://github.com/apache/kafka/pull/15086


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-03 Thread via GitHub


gharris1727 commented on PR #15101:
URL: https://github.com/apache/kafka/pull/15101#issuecomment-1875661177

   > It might be worth adding more names to the set of expected thread names.
   
   I think increasing the scope of this assertion by adding patterns/removing 
the pattern check/applying to tests outside of core are good ideas. We can land 
this change first and then explore those in follow-ups.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-03 Thread via GitHub


gharris1727 commented on code in PR #15101:
URL: https://github.com/apache/kafka/pull/15101#discussion_r1440661862


##
core/src/test/java/kafka/test/junit/LeakTestingExtension.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.utils.TestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import scala.Tuple2;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LeakTestingExtension implements BeforeEachCallback, 
AfterEachCallback {
+private static final Set EXPECTED_THREAD_NAMES = new HashSet<>(
+Arrays.asList("junit-", "JMX", 
"feature-zk-node-event-process-thread", "ForkJoinPool", "executor-",
+"kafka-scheduler-", "metrics-meter-tick-thread", 
"ReplicaFetcherThread", "scala-", "pool-")
+);
+private Set initialThreads;

Review Comment:
   I think using a state variable here is essentially a `static` variable, and 
is shared between tests running concurrently.
   We don't run concurrent tests in the same JVM by default, but it is easy to 
change that with configuration. Can you use the extensionContext to store the 
state to avoid this breaking in the future? 
https://junit.org/junit5/docs/current/user-guide/#extensions-keeping-state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-03 Thread via GitHub


gharris1727 commented on PR #15080:
URL: https://github.com/apache/kafka/pull/15080#issuecomment-1875639893

   > just invoking methods in order is not enough to trigger the deadlock. I 
believe it is possible to reliably reproduce the deadlock with two countdown 
latches, one countdown in the ConfigBackingStore#snapshot and another in 
ConfigBackingStore#putTaskConfigs. This requires a mock for the config backing 
store. If you have a better idea I am happy to analyze it.
   
   Yeah I understand. I think the cost of deterministically reproducing the 
deadlock is too high. I did it in #8259 because I didn't know what 
synchronization was missing and needed a repro case to debug.
   
   I would be satisfied with a test which non-deterministically reproduces the 
deadlock but is less brittle and includes less mocks. Currently we only have 
two connectors calling task reconfiguration (mirror checkpoint and source) and 
one test in the DistributedHerder. There is zero coverage in StandaloneHerder, 
which is part of why we never found this bug :)
   
   > Unrelated to this PR's issue, it may be that the wait operations in 
StandaloneHerderTest are by mistake 1000 seconds instead of milliseconds. Isn't 
it?
   
   Yeah that timeout seems a bit absurd, but if there aren't deadlocks in the 
test it should never incur that timeout. It looks like the test suite is very 
well behaved in practice, so I'm inclined to keep it as-is: 
https://ge.apache.org/scans/tests?search.names=Git%20branch=P90D=kafka=America%2FLos_Angeles=trunk=*StandaloneHerderTest=FLAKY


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?

2024-01-03 Thread Hugo Abreu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802246#comment-17802246
 ] 

Hugo Abreu commented on KAFKA-10875:


Hello!

After some digging around the code it seems that this behaviour is expected in 
topics / partitions with low amount of data flowing.


!image-2024-01-03-16-14-10-246.png|width=867,height=205!

[~gongyifei] , can this be what you are experiencing? As in, the second search, 
happened to have entered a new message that complies with the search.

> offsetsForTimes returns null for some partitions when it shouldn't?
> ---
>
> Key: KAFKA-10875
> URL: https://issues.apache.org/jira/browse/KAFKA-10875
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yifei Gong
>Priority: Minor
> Attachments: image-2024-01-03-16-14-10-246.png
>
>
> I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1
> I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am 
> trying to seek to offsets after certain timestamp inside 
> {{onPartitionsAssigned}} method by calling {{offsetsForTimes}}.
> I found this strange behavior of method {{offsetsForTimes}}:
> When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020 
> 5:06:55.534 AM) like below:
> {code:java}
> @Override
> public void onPartitionsAssigned(Consumer consumer, 
> Collection partitions) {
> // calling assignment just to ensure my consumer is actually assigned the 
> partitions
> Set tps = consumer.assignment();
> Map offsetsForTimes = new HashMap<>();
> offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream()
> .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L;
> }
> {code}
> By setting breakpoint, I can see I got below map:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} 
> "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} 
> "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)"
> {TopicPartition@5498} "My.Data.Topic-5" -> null
> {TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501} 
> "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)"
> {TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504} 
> "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)" 
> {TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507} 
> "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)" 
> {TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510} 
> "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)" 
> {TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513} 
> "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)" 
> {TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516} 
> "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)" 
> {TopicPartition@5518} "My.Data.Topic-6" -> null
> {noformat}
> As you can see some of the partitions (5 and 6) it returned null.
> However, if I seek a more recent timestamp like {{1607941818423L}} (GMT 
> December 14, 2020 10:30:18.423 AM), I got offsets for all partitions:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} 
> "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} 
> "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)"
> {TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499} 
> "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)"
> {TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502} 
> "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)"
> {TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505} 
> "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)"
> {TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508} 
> "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)"
> {TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511} 
> "(timestamp=1607941818517, leaderEpoch=298, offset=22082849)"
> {TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514} 
> "(timestamp=1607941818423, leaderEpoch=332, offset=23491462)"
> {TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517} 
> "(timestamp=1607942934371, leaderEpoch=285, offset=22306422)"
> {TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520} 
> "(timestamp=1607941818424, leaderEpoch=317, offset=24677423)"
> {noformat}
> So I am confused why seeking to an older timestamp gave me nulls when there 
> are indeed messages with later timestamp as I tried the 2nd time? 



--
This message was sent by Atlassian Jira

[jira] [Updated] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?

2024-01-03 Thread Hugo Abreu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hugo Abreu updated KAFKA-10875:
---
Attachment: image-2024-01-03-16-14-10-246.png

> offsetsForTimes returns null for some partitions when it shouldn't?
> ---
>
> Key: KAFKA-10875
> URL: https://issues.apache.org/jira/browse/KAFKA-10875
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yifei Gong
>Priority: Minor
> Attachments: image-2024-01-03-16-14-10-246.png
>
>
> I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1
> I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am 
> trying to seek to offsets after certain timestamp inside 
> {{onPartitionsAssigned}} method by calling {{offsetsForTimes}}.
> I found this strange behavior of method {{offsetsForTimes}}:
> When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020 
> 5:06:55.534 AM) like below:
> {code:java}
> @Override
> public void onPartitionsAssigned(Consumer consumer, 
> Collection partitions) {
> // calling assignment just to ensure my consumer is actually assigned the 
> partitions
> Set tps = consumer.assignment();
> Map offsetsForTimes = new HashMap<>();
> offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream()
> .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L;
> }
> {code}
> By setting breakpoint, I can see I got below map:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} 
> "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} 
> "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)"
> {TopicPartition@5498} "My.Data.Topic-5" -> null
> {TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501} 
> "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)"
> {TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504} 
> "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)" 
> {TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507} 
> "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)" 
> {TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510} 
> "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)" 
> {TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513} 
> "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)" 
> {TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516} 
> "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)" 
> {TopicPartition@5518} "My.Data.Topic-6" -> null
> {noformat}
> As you can see some of the partitions (5 and 6) it returned null.
> However, if I seek a more recent timestamp like {{1607941818423L}} (GMT 
> December 14, 2020 10:30:18.423 AM), I got offsets for all partitions:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} 
> "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} 
> "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)"
> {TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499} 
> "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)"
> {TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502} 
> "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)"
> {TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505} 
> "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)"
> {TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508} 
> "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)"
> {TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511} 
> "(timestamp=1607941818517, leaderEpoch=298, offset=22082849)"
> {TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514} 
> "(timestamp=1607941818423, leaderEpoch=332, offset=23491462)"
> {TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517} 
> "(timestamp=1607942934371, leaderEpoch=285, offset=22306422)"
> {TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520} 
> "(timestamp=1607941818424, leaderEpoch=317, offset=24677423)"
> {noformat}
> So I am confused why seeking to an older timestamp gave me nulls when there 
> are indeed messages with later timestamp as I tried the 2nd time? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add reviewers GitHub action [kafka]

2024-01-03 Thread via GitHub


viktorsomogyi commented on PR #15115:
URL: https://github.com/apache/kafka/pull/15115#issuecomment-1875609641

   One more thing is that the current script places the "Reviewers:" section 
after "Committer checklist" so it's not automatically included in the merge 
template. I'll see if that can be improved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15147) Measure pending and outstanding Remote Segment operations

2024-01-03 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802238#comment-17802238
 ] 

Christo Lolov commented on KAFKA-15147:
---

Heya [~enether], after a very quick search the metric documentation appears to 
actually be in the kafka codebase itself so I will open a pull request against 
the 3.7 branch tomorrow morning and tag you as a reviewer!

> Measure pending and outstanding Remote Segment operations
> -
>
> Key: KAFKA-15147
> URL: https://issues.apache.org/jira/browse/KAFKA-15147
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
>  
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Upload+and+delete+lag+metrics+in+Tiered+Storage
>  
> KAFKA-15833: RemoteCopyLagBytes 
> KAFKA-16002: RemoteCopyLagSegments, RemoteDeleteLagBytes, 
> RemoteDeleteLagSegments
> KAFKA-16013: ExpiresPerSec
> KAFKA-16014: RemoteLogSizeComputationTime, RemoteLogSizeBytes, 
> RemoteLogMetadataCount
> KAFKA-15158: RemoteDeleteRequestsPerSec, RemoteDeleteErrorsPerSec, 
> BuildRemoteLogAuxStateRequestsPerSec, BuildRemoteLogAuxStateErrorsPerSec
> 
> Remote Log Segment operations (copy/delete) are executed by the Remote 
> Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default 
> TopicBasedRLMM writes to the internal Kafka topic state changes on remote log 
> segments).
> As executions run, fail, and retry; it will be important to know how many 
> operations are pending and outstanding over time to alert operators.
> Pending operations are not enough to alert, as values can oscillate closer to 
> zero. An additional condition needs to apply (running time > threshold) to 
> consider an operation outstanding.
> Proposal:
> RemoteLogManager could be extended with 2 concurrent maps 
> (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure 
> segmentId time when operation started, and based on this expose 2 metrics per 
> operation:
>  * pendingSegmentCopies: gauge of pendingSegmentCopies map
>  * outstandingSegmentCopies: loop over pending ops, and if now - startedTime 
> > timeout, then outstanding++ (maybe on debug level?)
> Is this a valuable metric to add to Tiered Storage? or better to solve on a 
> custom RLMM implementation?
> Also, does it require a KIP?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]

2024-01-03 Thread via GitHub


clolov commented on code in PR #15116:
URL: https://github.com/apache/kafka/pull/15116#discussion_r1440615357


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -453,17 +456,17 @@ private void shouldPollWithRightTimeout(final boolean 
stateUpdaterEnabled) {
 @Test
 public void shouldPollWithRightTimeoutWithStateUpdaterDefault() {
 setupStateManagerMock();
+setupStoreMetadata();
 final Properties properties = new Properties();
 shouldPollWithRightTimeout(properties);
 }
 
 private void shouldPollWithRightTimeout(final Properties properties) {
 final TaskId taskId = new TaskId(0, 0);
 
-
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
-EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();

Review Comment:
   Ditto



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -257,15 +255,16 @@ public void 
shouldSupportUnregisterChangelogBeforeInitialization() {
 @Test
 public void shouldSupportUnregisterChangelogBeforeCompletion() {
 setupStateManagerMock();
+setupStoreMetadata();
 final Map mockTasks = mock(Map.class);
 
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
 
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
-EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
-EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
+when(storeMetadata.offset()).thenReturn(9L);
 if (type == STANDBY) {
+when(storeMetadata.endOffset()).thenReturn(10L);

Review Comment:
   Only used on the STANDBY path



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -821,11 +831,11 @@ public Map 
committed(final Set mockTasks = mock(Map.class);
 
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
-EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();

Review Comment:
   Reported as unused by Mockito



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -363,12 +363,12 @@ public void 
shouldSupportUnregisterChangelogAfterCompletion() {
 @Test
 public void shouldInitializeChangelogAndCheckForCompletion() {
 setupStateManagerMock();
+setupStoreMetadata();
 final Map mockTasks = mock(Map.class);
 
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
 
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
-EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
-EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();

Review Comment:
   Unused according to Mockito



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -862,10 +873,9 @@ public void 
shouldRequestCommittedOffsetsAndHandleTimeoutException() {
 EasyMock.expectLastCall();
 
 when(stateManager.changelogAsSource(tp)).thenReturn(true);
-EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
-EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();

Review Comment:
   Ditto



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -924,13 +934,13 @@ public synchronized ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(fina
 @Test
 public void shouldThrowIfCommittedOffsetsFail() {
 setupStateManagerMock();
+when(storeMetadata.changelogPartition()).thenReturn(tp);
 
 final TaskId taskId = new TaskId(0, 0);
 
 when(stateManager.taskId()).thenReturn(taskId);
 when(stateManager.changelogAsSource(tp)).thenReturn(true);
-EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();

Review Comment:
   Ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]

2024-01-03 Thread via GitHub


clolov opened a new pull request, #15116:
URL: https://github.com/apache/kafka/pull/15116

   This pull request takes a similar approach to how TaskManagerTest is being 
migrated to Mockito mock by mock for easier reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add reviewers GitHub action [kafka]

2024-01-03 Thread via GitHub


viktorsomogyi commented on PR #15115:
URL: https://github.com/apache/kafka/pull/15115#issuecomment-1875576165

   One caveat with the "Name \" format is that \<\> is markup 
syntax and means superscript. This doesn't work however in case of emails, so 
Github will just display "Name name@email", so without \<\>.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add reviewers GitHub action [kafka]

2024-01-03 Thread via GitHub


viktorsomogyi opened a new pull request, #15115:
URL: https://github.com/apache/kafka/pull/15115

   This PR adds a github action that is triggered when a GitHub review is 
submitted.
   The action does the following:
   - if there is no "Reviewers" line at the end of the description, then 
appends it with the current reviewer's name and email that is set in their 
GitHub account.
   - if there is a "Reviewers" line at the end, then appends the reviewer's 
name and email. It uses the "Name \" format.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Test2 [kafka]

2024-01-03 Thread via GitHub


viktorsomogyi closed pull request #15114: Test2
URL: https://github.com/apache/kafka/pull/15114


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Test2 [kafka]

2024-01-03 Thread via GitHub


viktorsomogyi commented on PR #15114:
URL: https://github.com/apache/kafka/pull/15114#issuecomment-1875553120

   Meant to create this in my repo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Test2 [kafka]

2024-01-03 Thread via GitHub


viktorsomogyi opened a new pull request, #15114:
URL: https://github.com/apache/kafka/pull/15114

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate consumer mock in TaskManagerTest to Mockito [kafka]

2024-01-03 Thread via GitHub


clolov commented on code in PR #15112:
URL: https://github.com/apache/kafka/pull/15112#discussion_r1440569384


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -197,6 +196,8 @@ public class TaskManagerTest {
 @Mock(type = MockType.STRICT)
 private Consumer consumer;
 @org.mockito.Mock
+private Consumer mockitoConsumer;

Review Comment:
   I ran into quite a lot of problems when I tried migrating the whole mock, so 
I decided to do the migration test-by-test. This way problems could be flushed 
out. By introducing this mock and using `setMainConsumer` on a test-by-test 
basis things are easier to go through (at least in my opinion )



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4809,11 +4768,14 @@ public void 
shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
 exception.corruptedTasks(),
 equalTo(Collections.singleton(taskId00))
 );
+
+Mockito.verify(mockitoConsumer, times(2)).groupMetadata();

Review Comment:
   Mockito claims the method is called twice. This wasn't specified in EasyMock 
world, but I decided to make it explicit now.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3010,13 +2993,9 @@ public void initializeIfNeeded() {
 }
 };
 
-consumer.commitSync(Collections.emptyMap());
-expectLastCall();
-expect(consumer.assignment()).andReturn(emptySet());
-consumer.resume(eq(emptySet()));
-expectLastCall();

Review Comment:
   Same as above, Mockito reported these are unused. I added an assertion in 
Mockito world at the end (...I should probably add one in the first test as 
well, but I can do this in part 2 of this pull request)



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1168,13 +1166,10 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
 when(stateUpdater.drainRemovedTasks())
 .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, 
taskToUpdateInputPartitions));
 when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, 
taskId01Partitions, consumer))
+when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, 
taskId01Partitions, mockitoConsumer))
 .thenReturn(convertedTask1);
 when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, 
taskId00Partitions))
 .thenReturn(convertedTask0);
-expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-consumer.resume(anyObject());
-expectLastCall().anyTimes();

Review Comment:
   According to Mockito these were unused. I deleted them in EasyMock world and 
the test still passed. Hence I removed them.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3047,14 +3027,9 @@ public void completeRestoration(final 
java.util.function.Consumer> assignment = 
singletonMap(taskId00, taskId00Partitions);
 final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
false, stateManager);
 
+taskManager.setMainConsumer(mockitoConsumer);
+
 // `handleAssignment`
 
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
 
-// `tryToCompleteRestoration`
-expect(consumer.assignment()).andReturn(emptySet());
-consumer.resume(eq(emptySet()));
-expectLastCall();
-
-// `shutdown`
-consumer.commitSync(Collections.emptyMap());
-expectLastCall();

Review Comment:
   Ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16070: extract the setReadOnly method into Headers [kafka]

2024-01-03 Thread via GitHub


Joker-5 opened a new pull request, #15113:
URL: https://github.com/apache/kafka/pull/15113

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   TODO unit tests.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14133: Migrate consumer mock in TaskManagerTest to Mockito [kafka]

2024-01-03 Thread via GitHub


clolov opened a new pull request, #15112:
URL: https://github.com/apache/kafka/pull/15112

   This pull request migrates the consumer mock in TaskManagerTest test by test 
for easier reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Prevent java.lang.UnsupportedOperationException in MockAdminClient [kafka]

2024-01-03 Thread via GitHub


jamespfaulkner commented on PR #14955:
URL: https://github.com/apache/kafka/pull/14955#issuecomment-1875517539

   > Looks good! Out of curiosity, do you happen to know tests which are 
failing with this UnsupportedOperationException?
   
   Ah, sorry I missed this question first time round. I have a small app that 
constantly polls the `describeLogDirs`. When I saw the `MockAdminClient` had 
changed it's implementation of `describeLogDirs` to no longer immediately throw 
a `UnsupportedOperationException` I took the opportunity to improve my tests 
and spotted this .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]

2024-01-03 Thread via GitHub


divijvaidya commented on PR #15109:
URL: https://github.com/apache/kafka/pull/15109#issuecomment-1875475685

   https://github.com/apache/beam/pull/27015 is not Apache Beam enabled this. 
Seems like we need to request a build node from Apache Infra and change our 
jenkins to pick up creds similar to how beam did it (from env vars which are 
deployed by infra). I have created a ticket to talk to Infra at 
https://issues.apache.org/jira/browse/INFRA-25336 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]

2024-01-03 Thread via GitHub


Phuc-Hong-Tran commented on PR #15020:
URL: https://github.com/apache/kafka/pull/15020#issuecomment-1875430964

   @philipnee, when you said I should "rethink" about the approach, did you 
mean I should change the approach or I can just fix the code where you 
commented? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]

2024-01-03 Thread via GitHub


nicktelford commented on PR #15109:
URL: https://github.com/apache/kafka/pull/15109#issuecomment-1875405401

   Looks like Jenkins doesn't have permission (by default at least) to write 
entries to the ASF Gradle Enterprise Build Cache:
   ```
   Could not store entry 744bd685bdd1d149ec93934b276b7698 in remote build 
cache: Storing entry at 
'https://ge.apache.org/cache/744bd685bdd1d149ec93934b276b7698' response status 
403: Forbidden
   ```
   
   It might require some explicit authentication.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]

2024-01-03 Thread via GitHub


nicktelford commented on PR #15109:
URL: https://github.com/apache/kafka/pull/15109#issuecomment-1875356669

   @ijuma I've temporarily reconfigured it to enable the cache for this branch 
(`gradle-remote-build-cache`) instead of `trunk`, so we can test that it works 
as expected using the CI builds on this PR.
   
   Pushing to the cache from these builds should be fine, since there are no 
code changes, so the task outputs should be the same as in `trunk` anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]

2024-01-03 Thread via GitHub


nicktelford commented on PR #15105:
URL: https://github.com/apache/kafka/pull/15105#issuecomment-1875335559

   @lucasbru OK, my bad. It turns out I did a minor refactoring _after_ I ran 
the test suite yesterday that was so insignificant I didn't think I needed to 
run the tests again... Turns out I was wrong :see_no_evil: 
   
   I've fixed the bug now and the tests pass locally for real now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?

2024-01-03 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802158#comment-17802158
 ] 

Phuc Hong Tran commented on KAFKA-15867:


[~pnee] I happened to come across this comment: 
[https://github.com/apache/kafka/blob/60c445bdd51c608d1212f5cab83b65533739bd61/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L61]



[~kirktrue], may you give some insights on why we don't want to propagate the 
error to the caller? Thanks 

> Should ConsumerNetworkThread wrap the exception and notify the polling thread?
> --
>
> Key: KAFKA-15867
> URL: https://issues.apache.org/jira/browse/KAFKA-15867
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The ConsumerNetworkThread runs a tight loop infinitely.  However, when 
> encountering an unexpected exception, it logs an error and continues.
>  
> I think this might not be ideal because user can run blind for a long time 
> before discovering there's something wrong with the code; so I believe we 
> should propagate the throwable back to the polling thread. 
>  
> cc [~lucasbru] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix toString method of IsolationLevel [kafka]

2024-01-03 Thread via GitHub


PzaThief commented on PR #14782:
URL: https://github.com/apache/kafka/pull/14782#issuecomment-1875332142

   > Build failed again -- can you maybe rebase this PR on `trunk` to ensure 
it's on the latest version.
   
   Sorry for missed it. Can you please retrigger it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-03 Thread via GitHub


developster commented on PR #15080:
URL: https://github.com/apache/kafka/pull/15080#issuecomment-1875320392

   @gharris1727 , sure. I wrote the test but just invoking methods in order is 
not enough to trigger the deadlock. The requestTaskReconfiguration method is 
always executed before putConnectorConfig.
   
   I believe it is possible to reliably reproduce the deadlock with two 
countdown latches, one countdown in the `ConfigBackingStore#snapshot` and 
another in `ConfigBackingStore#putTaskConfigs`. This requires a mock for the 
config backing store. If you have a better idea I am happy to analyze it.
   
   Unrelated to this PR's issue, it may be that the wait operations in 
StandaloneHerderTest are by mistake 1000 seconds instead of milliseconds. Isn't 
it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix toString method of IsolationLevel [kafka]

2024-01-03 Thread via GitHub


mjsax commented on PR #14782:
URL: https://github.com/apache/kafka/pull/14782#issuecomment-1875305566

   Build failed again -- can you maybe rebase this PR on `trunk` to ensure it's 
on the latest version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Prevent java.lang.UnsupportedOperationException in MockAdminClient [kafka]

2024-01-03 Thread via GitHub


vamossagar12 commented on PR #14955:
URL: https://github.com/apache/kafka/pull/14955#issuecomment-1875226073

   > Looks good! Out of curiosity, do you happen to know tests which are 
failing with this UnsupportedOperationException?
   
   yeah even I was interested to know this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]

2024-01-03 Thread via GitHub


viktorsomogyi commented on code in PR #15048:
URL: https://github.com/apache/kafka/pull/15048#discussion_r1440302423


##
core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.docker
+
+import kafka.tools.StorageTool
+import kafka.utils.Exit
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption}
+
+object KafkaDockerWrapper {
+  def main(args: Array[String]): Unit = {
+if (args.length == 0) {
+  throw new RuntimeException(s"Error: No operation input provided. " +
+s"Please provide a valid operation: 'setup'.")
+}
+val operation = args.head
+val arguments = args.tail
+
+operation match {
+  case "setup" =>
+if (arguments.length != 3) {
+  val errMsg = "not enough arguments passed. Usage: " +
+"setup  , 
"
+  System.err.println(errMsg)
+  Exit.exit(1, Some(errMsg))
+}
+val defaultConfigsDir = arguments(0)
+val mountedConfigsDir = arguments(1)
+val finalConfigsDir = arguments(2)

Review Comment:
   Parsing these 3 arguments would also be a good opportunity to validate and 
convert these into `Path` objects. Using the API of `Path` also makes your code 
more robust down the line when you append the config file names in 
`prepareConfigs`.



##
core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.docker
+
+import kafka.tools.StorageTool
+import kafka.utils.Exit
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption}
+
+object KafkaDockerWrapper {
+  def main(args: Array[String]): Unit = {
+if (args.length == 0) {
+  throw new RuntimeException(s"Error: No operation input provided. " +
+s"Please provide a valid operation: 'setup'.")
+}
+val operation = args.head
+val arguments = args.tail
+
+operation match {
+  case "setup" =>
+if (arguments.length != 3) {
+  val errMsg = "not enough arguments passed. Usage: " +
+"setup  , 
"
+  System.err.println(errMsg)
+  Exit.exit(1, Some(errMsg))
+}
+val defaultConfigsDir = arguments(0)
+val mountedConfigsDir = arguments(1)
+val finalConfigsDir = arguments(2)
+try {
+  prepareConfigs(defaultConfigsDir, mountedConfigsDir, finalConfigsDir)
+} catch {
+  case e: Throwable =>
+val errMsg = s"error while preparing configs: ${e.getMessage}"
+System.err.println(errMsg)
+Exit.exit(1, Some(errMsg))
+}
+
+val formatCmd = formatStorageCmd(finalConfigsDir, envVars)
+StorageTool.main(formatCmd)
+  case _ =>
+throw new RuntimeException(s"Unknown operation $operation. " +
+  s"Please provide a valid operation: 'setup'.")
+}
+  }
+
+  import Constants._
+
+  private def formatStorageCmd(configsDir: String, env: Map[String, String]): 
Array[String] = {

Review Comment:
   While I agree that it works, it's not a nice solution. Would it be a big 
refactor to use a specific method from `StorageTool` instead of `main`?



##
core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala:
##
@@ -0,0 +1,218 @@

Re: [PR] MINOR: Improve code style about producer [kafka]

2024-01-03 Thread via GitHub


divijvaidya merged PR #15107:
URL: https://github.com/apache/kafka/pull/15107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15816: Fix leaked sockets in trogdor tests [kafka]

2024-01-03 Thread via GitHub


divijvaidya merged PR #14771:
URL: https://github.com/apache/kafka/pull/14771


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15816: Fix leaked sockets in trogdor tests [kafka]

2024-01-03 Thread via GitHub


divijvaidya commented on PR #14771:
URL: https://github.com/apache/kafka/pull/14771#issuecomment-1875169735

   The test modified in this PR is successful - 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14771/4/testReport/org.apache.kafka.trogdor.agent/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-03 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1440285636


##
storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.List;
+
+public interface SegmentDeletionReason {

Review Comment:
   `SegmentDeletionReason` has other implementations like `RetentionMsBreach`, 
`RetentionSizeBreach`, `StartOffsetBreach` which use `UnifiedLog`. We can take 
a relook at this when we refactor `UnifiedLog`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-03 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1440207191


##
storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.List;
+
+public interface SegmentDeletionReason {

Review Comment:
   SegmentDeletionReason has other implementations like `RetentionMsBreach`, 
`RetentionSizeBreach`, `StartOffsetBreach` which use UnifiedLog. We can take a 
relook at this when we refactor UnifiedLog. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-03 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1440207191


##
storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.List;
+
+public interface SegmentDeletionReason {

Review Comment:
   SegmentDeletionReason has other implementations like `RetentionMsBreach`, 
`RetentionSizeBreach`, `StartOffsetBreach` which use UnifiedLog. We can take a 
relook at this when we refactor UnifiedLog. 



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*  (if there is one). It returns true iff the segment is 
deletable.
* @return the segments ready to be deleted
*/
-  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], 
upperBoundOffset: Long): Boolean = {
-  val allowDeletionDueToLogStartOffsetIncremented = 
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
+  private[log] def deletableSegments(predicate: (LogSegment, 
Optional[LogSegment]) => Boolean): Iterable[LogSegment] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
 localLog.checkIfMemoryMappedBufferClosed()
 // remove the segments for lookups
-localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava,  
true, reason)

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.



##
checkstyle/import-control-core.xml:
##
@@ -37,6 +37,8 @@
   
   
   
+  
+  

Review Comment:
   Right, it is not needed. I guess it was needed when that class was kept 
inside the core module. 



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, 
upperBoundOffset: Long): List[AbortedTxn] = {

Review Comment:
   I thought these could be addressed when `UnifiedLog` is refactored and moved 
to storage module.
   



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging {
 def deleteProducerSnapshots(): Unit = {
   LocalLog.maybeHandleIOException(logDirFailureChannel,
 parentDir,
-s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
+s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir", {
 snapshotsToDelete.foreach { snapshot =>
   snapshot.deleteIfExists()
 }
-  }
+  return;

Review Comment:
   It was expecting a return for inline declaration, it is throwing a type 
mismatch error without that. 



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the 

Re: [PR] MINOR: Increase parallelism for Jenkins [kafka]

2024-01-03 Thread via GitHub


divijvaidya commented on PR #15099:
URL: https://github.com/apache/kafka/pull/15099#issuecomment-1875126583

   > @divijvaidya did you accidentally include the KafkaApisTest change here?
   
   Hi @jolshan 
   No, it was intentional since it wasn't merged to trunk at that time. Note 
that this is a draft PR at this stage. Surprisingly, we have more test failures 
when we increase parallelism for tests. I am trying to investigate that. I will 
tag you for review when it is ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-03 Thread hzh0425 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802095#comment-17802095
 ] 

hzh0425 edited comment on KAFKA-16073 at 1/3/24 9:54 AM:
-

Yes, this is another good solution, localLogStartOffset will always >= 
baseSegment.startOffset().[~satish.duggana] 


was (Author: JIRAUSER298236):
Yes, this is another good solution, localLogStartOffset will always >= 
baseSegment.startOffset().

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-03 Thread hzh0425 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802095#comment-17802095
 ] 

hzh0425 edited comment on KAFKA-16073 at 1/3/24 9:54 AM:
-

Yes, this is another good solution, localLogStartOffset will always >= 
baseSegment.startOffset().


was (Author: JIRAUSER298236):
Yes, this is another good solution, localLogStartOffset will always >= 
baseSegment.baseOffset().

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-03 Thread hzh0425 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802095#comment-17802095
 ] 

hzh0425 commented on KAFKA-16073:
-

Yes, this is another good solution, localLogStartOffset will always >= 
baseSegment.baseOffset().

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2024-01-03 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802093#comment-17802093
 ] 

Stanislav Kozlovski commented on KAFKA-16046:
-

resolving since it was merged! Thanks for the quick work!

 

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
> Fix For: 3.7.0
>
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2024-01-03 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski resolved KAFKA-16046.
-
Resolution: Fixed

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
> Fix For: 3.7.0
>
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Bump year to 2024 in NOTICE file [kafka]

2024-01-03 Thread via GitHub


stanislavkozlovski commented on code in PR #15111:
URL: https://github.com/apache/kafka/pull/15111#discussion_r1440235165


##
NOTICE:
##
@@ -20,4 +20,4 @@ 
clients/src/main/java/org/apache/kafka/common/utils/PureJavaCrc32C.java
 Some portions of this file Copyright (c) 2004-2006 Intel Corporation and 
licensed under the BSD license.
 
 This project contains the following code copied from Apache Hive:
-streams/src/main/java/org/apache/kafka/streams/state/internals/Murmur3.java

Review Comment:
   this was added automatically by `vi`. I figure we leave the new line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Bump year to 2024 in NOTICE file [kafka]

2024-01-03 Thread via GitHub


stanislavkozlovski opened a new pull request, #15111:
URL: https://github.com/apache/kafka/pull/15111

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]

2024-01-03 Thread via GitHub


lucasbru commented on PR #15105:
URL: https://github.com/apache/kafka/pull/15105#issuecomment-1875033002

   @nicktelford I'll rerun it. That being said, the last jobs on trunk have all 
finished within 3-5 hours, so this must be caused by either infrastructure or 
the code in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15344: Commit leader epoch where possible [kafka]

2024-01-03 Thread via GitHub


lucasbru commented on PR #14454:
URL: https://github.com/apache/kafka/pull/14454#issuecomment-1875029211

   @mjsax Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-03 Thread via GitHub


lucasbru commented on PR #15000:
URL: https://github.com/apache/kafka/pull/15000#issuecomment-1875025923

   @philipnee could you take a look please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]

2024-01-03 Thread via GitHub


nicktelford commented on PR #15105:
URL: https://github.com/apache/kafka/pull/15105#issuecomment-1875025617

   @lucasbru I think this is just the CI causing trouble again. The build 
passes locally. Is there a way to rerun the build without pushing more commits?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-03 Thread via GitHub


wernerdv commented on PR #15101:
URL: https://github.com/apache/kafka/pull/15101#issuecomment-1875025332

   @gharris1727 @ashwinpankaj
   I've updated the utility to check for leaked threads relative to threads 
created before the test.
   This looks more correct.
   
   It might be worth adding more names to the list of expected thread names.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]

2024-01-03 Thread via GitHub


lucasbru commented on PR #15105:
URL: https://github.com/apache/kafka/pull/15105#issuecomment-1875023389

   @nicktelford Seems like all build jobs timed out. Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito [kafka]

2024-01-03 Thread via GitHub


lucasbru merged PR #15106:
URL: https://github.com/apache/kafka/pull/15106


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >