Re: [PR] MINOR: Fix toString method of IsolationLevel [kafka]
ashwinpankaj commented on code in PR #14782: URL: https://github.com/apache/kafka/pull/14782#discussion_r1441389074 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java: ## @@ -234,7 +233,7 @@ public void start() { throw new ConnectException( "Must provide a TopicAdmin to KafkaBasedLog when consumer is configured with " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to " -+ IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT) ++ IsolationLevel.READ_COMMITTED Review Comment: ```suggestion + IsolationLevel.READ_COMMITTED.toString() ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ## @@ -248,7 +247,7 @@ public void start() { + "support for source connectors, or upgrade to a newer Kafka broker version."; } else { message = "When " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + "is set to " -+ IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT) ++ IsolationLevel.READ_COMMITTED Review Comment: ```suggestion + IsolationLevel.READ_COMMITTED.toString() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802401#comment-17802401 ] Kohei Nozaki commented on KAFKA-16055: -- Pull request: [https://github.com/apache/kafka/pull/15121] > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16055: Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders [kafka]
nuzayats opened a new pull request, #15121: URL: https://github.com/apache/kafka/pull/15121 This PR replaces a HashMap by a ConcurrentHashMap so that the local state store queries can be made from multiple threads. This is based on a discussion in the kafka-users mailing list. See this for additional context: https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
ashwinpankaj commented on code in PR #15101: URL: https://github.com/apache/kafka/pull/15101#discussion_r1441364275 ## core/src/test/java/kafka/test/junit/LeakTestingExtension.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.junit; + +import kafka.utils.TestUtils; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ExtensionContext.Namespace; +import org.junit.jupiter.api.extension.ExtensionContext.Store; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import scala.Tuple2; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { +private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( +Arrays.asList("junit-", "JMX", "feature-zk-node-event-process-thread", "ForkJoinPool", "executor-", +"metrics-meter-tick-thread", "scala-", "pool-") +); +private static final String THREADS_KEY = "threads"; + +@Override +public void beforeEach(ExtensionContext context) { +getStore(context).put(THREADS_KEY, Thread.getAllStackTraces().keySet()); +} + +@Override +@SuppressWarnings("unchecked") +public void afterEach(ExtensionContext context) { +Set initialThreads = getStore(context).remove(THREADS_KEY, Set.class); +Tuple2, Object> unexpectedThreads = TestUtils.computeUntilTrue( +() -> unexpectedThreads(initialThreads), +DEFAULT_MAX_WAIT_MS, +100L, +Set::isEmpty +); + +assertTrue(unexpectedThreads._1.isEmpty(), "Found unexpected threads after executing test: " + + unexpectedThreads._1.stream().map(Objects::toString).collect(Collectors.joining(", "))); +} + +private Set unexpectedThreads(Set initialThreads) { +Set finalThreads = Thread.getAllStackTraces().keySet(); + +if (initialThreads.size() != finalThreads.size()) { Review Comment: intialThreads and finalThreads could have the same size but contain different threads. I think we should match the names always. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1876338999 > Thanks @iit2009060 for the PR. > > Let us say there are two segments in remote storage and subsequents segments in local storage. remote-seg-10[10, 20], remote-seg-21[21, 30] : offsets 25 to 30 are compacted. local-seg-31[31, 40] > > When a fetch request comes for offsets with in [25, 30] then it should move to the local segment as those offsets might have been compacted earlier. Did you also cover this scenario in this PR? @satishd I have not tested this case explicitly. In this case RemoteLogManager would be returning firstBatch as null and the controller(The class which is invoking RemoteLogManager read) should take care of reading the next segment locally. Let me reproduce this issue locally and update the behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802024#comment-17802024 ] Satish Duggana edited comment on KAFKA-16073 at 1/4/24 3:50 AM: That was a good catch [~hzh0425@apache] ! I think it is better to avoid holding a lock for local-log-start-offset updates or fetches, that can introduce other side effects. We discussed one possible solution is to address it by updating local-log-start-offset before the segments are removed from inmemory and scheduled for deletion but we need to think through the end to end scenarios. cc [~Kamal C] was (Author: satish.duggana): We discussed one possible solution is to address it by updating local-log-start-offset before the segments are removed from inmemory and scheduled for deletion but we need to think through the end to end scenarios. cc [~Kamal C] > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fixed typos in docker readme documentation [kafka]
showuon merged PR #15120: URL: https://github.com/apache/kafka/pull/15120 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16074: close leaking threads in replica manager tests [kafka]
showuon commented on PR #15077: URL: https://github.com/apache/kafka/pull/15077#issuecomment-1876239029 @satishd , please take another look when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]
jolshan merged PR #14599: URL: https://github.com/apache/kafka/pull/14599 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]
Phuc-Hong-Tran commented on PR #15020: URL: https://github.com/apache/kafka/pull/15020#issuecomment-1876148310 I understand, will get another PR out ASAP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
tinaselenge commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1441077572 ## clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java: ## @@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() { public static class TestFileConfigProvider extends FileConfigProvider { @Override -protected Reader reader(String path) throws IOException { +protected Reader reader(Path path) throws IOException { return new StringReader("testKey=testResult\ntestKey2=testResult2"); } } + +@Test +public void testAllowedDirPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(dirFile); +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testAllowedFilePath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(dirFile); +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testMultipleAllowedPaths() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir); +configProvider.configure(configs); + +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); + +ConfigData configData = configProvider.get(dirFile); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); + +configData = configProvider.get(siblingDirFile); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testNotAllowedDirPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(siblingDirFile); +assertTrue(configData.data().isEmpty()); +assertNull(configData.ttl()); +} + +@Test +public void testNotAllowedFilePath() throws IOException { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +//another file under the same directory +Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2")); +ConfigData configData = configProvider.get(dirFile2.toString()); +assertTrue(configData.data().isEmpty()); +assertNull(configData.ttl()); +} + +@Test +public void testNoTraversal() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +// Check we can't escape outside the path directory +ConfigData configData = configProvider.get(dirFile + Paths.get("/../siblingdir/siblingdirFile")); Review Comment: I have also added tests for ConfigProviderUtils, so I guess we could remove the duplicated traversal tests but wonder if we should still keep the allowed paths tests in the these classes as they test the full flow. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15467) Kafka broker returns offset out of range for topic/partitions on restart from unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Jacobs updated KAFKA-15467: - Component/s: log > Kafka broker returns offset out of range for topic/partitions on restart from > unclean shutdown > -- > > Key: KAFKA-15467 > URL: https://issues.apache.org/jira/browse/KAFKA-15467 > Project: Kafka > Issue Type: Bug > Components: core, log >Affects Versions: 3.5.1 > Environment: Apache Kafka 3.5.1 with Strimzi on kubernetes. >Reporter: Steve Jacobs >Priority: Major > > So this started with me thinking this was a mirrormaker2 issue because here > are the symptoms I am seeing: > I'm encountering an odd issue with mirrormaker2 with our remote replication > setup to high latency remote sites (satellite). > Every few days we get several topics completely re-replicated, this appears > to happen after a network connectivity outage. It doesn't matter if it's a > long outage (hours) or a short one (minutes). And it only seems to affect a > few topics. > I was finally able to track down some logs showing the issue. This was after > an hour-ish long outage where connectivity went down. There were lots of logs > about connection timeouts, etc. Here is the relevant part when the connection > came back up: > {code:java} > 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] > [AdminClient > clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > Disconnecting from node 0 due to socket connection setup timeout. The > timeout value is 63245 ms. (org.apache.kafka.clients.NetworkClient) > [kafka-admin-client-thread | > mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] > [AdminClient > clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > Metadata update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager) > [kafka-admin-client-thread | > mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Disconnecting from node 0 due to socket connection setup > timeout. The timeout value is 52624 ms. > (org.apache.kafka.clients.NetworkClient) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Error sending fetch request (sessionId=460667411, > epoch=INITIAL) to node 0: (org.apache.kafka.clients.FetchSessionHandler) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > 2023-09-08 16:52:47,336 INFO [scbi->gcp.MirrorSourceConnector|worker] > refreshing topics took 67359 ms (org.apache.kafka.connect.mirror.Scheduler) > [Scheduler for MirrorSourceConnector: > scbi->gcp|scbi->gcp.MirrorSourceConnector-refreshing topics] > 2023-09-08 16:52:48,413 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Fetch position FetchPosition{offset=4918131, > offsetEpoch=Optional[0], > currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094 > (id: 0 rack: null)], epoch=0}} is out of range for partition > reading.sensor.hfp01sc-0, resetting offset > (org.apache.kafka.clients.consumer.internals.AbstractFetch) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > (Repeats for 11 more topics) > 2023-09-08 16:52:48,479 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Resetting offset for partition reading.sensor.hfp01sc-0 to > position FetchPosition{offset=3444977, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094 > (id: 0 rack: null)], epoch=0}}. > (org.apache.kafka.clients.consumer.internals.SubscriptionState) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > (Repeats for 11 more topics) {code} > The consumer reports that offset 4918131 is out of range for this > topic/partition, but that offset still exists on the remote cluster. I can go > pull it up with a consumer right now. The earliest offset in that topic that > still exists is 3444977 as of yesterday. We have 30 day retention configured > so pulling in 30 days of duplicate data is very not good. It
Re: [PR] KAFKA-16067 Refactoring ConsumerGroupListing + add test [kafka]
hachikuji commented on code in PR #15092: URL: https://github.com/apache/kafka/pull/15092#discussion_r1441070583 ## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ## @@ -104,8 +104,8 @@ public boolean equals(Object obj) { return false; if (isSimpleConsumerGroup != other.isSimpleConsumerGroup) return false; -if (state == null) { -if (other.state != null) +if (!state.isPresent()) { Review Comment: I think the expectation in the previous code was that `Optional.equals` would handle this case. So can we just get rid of the null check? Note that this depends on the `requireNonNull` in the constructor. Perhaps we should have a unit test to protect it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15467) Kafka broker returns offset out of range for topic/partitions on restart from unclean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Jacobs updated KAFKA-15467: - Component/s: core > Kafka broker returns offset out of range for topic/partitions on restart from > unclean shutdown > -- > > Key: KAFKA-15467 > URL: https://issues.apache.org/jira/browse/KAFKA-15467 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.1 > Environment: Apache Kafka 3.5.1 with Strimzi on kubernetes. >Reporter: Steve Jacobs >Priority: Major > > So this started with me thinking this was a mirrormaker2 issue because here > are the symptoms I am seeing: > I'm encountering an odd issue with mirrormaker2 with our remote replication > setup to high latency remote sites (satellite). > Every few days we get several topics completely re-replicated, this appears > to happen after a network connectivity outage. It doesn't matter if it's a > long outage (hours) or a short one (minutes). And it only seems to affect a > few topics. > I was finally able to track down some logs showing the issue. This was after > an hour-ish long outage where connectivity went down. There were lots of logs > about connection timeouts, etc. Here is the relevant part when the connection > came back up: > {code:java} > 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] > [AdminClient > clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > Disconnecting from node 0 due to socket connection setup timeout. The > timeout value is 63245 ms. (org.apache.kafka.clients.NetworkClient) > [kafka-admin-client-thread | > mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] > [AdminClient > clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > Metadata update failed > (org.apache.kafka.clients.admin.internals.AdminMetadataManager) > [kafka-admin-client-thread | > mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin] > 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Disconnecting from node 0 due to socket connection setup > timeout. The timeout value is 52624 ms. > (org.apache.kafka.clients.NetworkClient) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Error sending fetch request (sessionId=460667411, > epoch=INITIAL) to node 0: (org.apache.kafka.clients.FetchSessionHandler) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > 2023-09-08 16:52:47,336 INFO [scbi->gcp.MirrorSourceConnector|worker] > refreshing topics took 67359 ms (org.apache.kafka.connect.mirror.Scheduler) > [Scheduler for MirrorSourceConnector: > scbi->gcp|scbi->gcp.MirrorSourceConnector-refreshing topics] > 2023-09-08 16:52:48,413 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Fetch position FetchPosition{offset=4918131, > offsetEpoch=Optional[0], > currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094 > (id: 0 rack: null)], epoch=0}} is out of range for partition > reading.sensor.hfp01sc-0, resetting offset > (org.apache.kafka.clients.consumer.internals.AbstractFetch) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > (Repeats for 11 more topics) > 2023-09-08 16:52:48,479 INFO [scbi->gcp.MirrorSourceConnector|task-1] > [Consumer > clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer, > groupId=null] Resetting offset for partition reading.sensor.hfp01sc-0 to > position FetchPosition{offset=3444977, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094 > (id: 0 rack: null)], epoch=0}}. > (org.apache.kafka.clients.consumer.internals.SubscriptionState) > [task-thread-scbi->gcp.MirrorSourceConnector-1] > (Repeats for 11 more topics) {code} > The consumer reports that offset 4918131 is out of range for this > topic/partition, but that offset still exists on the remote cluster. I can go > pull it up with a consumer right now. The earliest offset in that topic that > still exists is 3444977 as of yesterday. We have 30 day retention configured > so pulling in 30 days of duplicate data is very not good. It almost
Re: [PR] KAFKA-16067 Refactoring ConsumerGroupListing + add test [kafka]
hachikuji commented on code in PR #15092: URL: https://github.com/apache/kafka/pull/15092#discussion_r1441070183 ## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListingTest.java: ## @@ -0,0 +1,27 @@ +package org.apache.kafka.clients.admin; Review Comment: We are missing the license. Also, we need to move this into `clients/src/test/java`. ## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ## @@ -104,8 +104,8 @@ public boolean equals(Object obj) { return false; if (isSimpleConsumerGroup != other.isSimpleConsumerGroup) return false; -if (state == null) { -if (other.state != null) +if (!state.isPresent()) { Review Comment: I think the expectation in the previous code was that `Optional.equals` would handle this case. So can we just get rid of the null check? This depends on the `requireNonNull` in the constructor. Perhaps we should have a unit test to protect it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steve Jacobs closed KAFKA-10133. > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.7.0 > > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1441068240 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig, * @param responseCallback callback for sending the response * @param delayedProduceLocklock for the delayed actions * @param recordValidationStatsCallback callback for updating stats on record conversions - * @param requestLocal container for the stateful instances scoped to this request - * @param transactionalId transactional ID if the request is from a producer and the producer is transactional + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. + * @param verificationGuardsthe mapping from topic partition to verification guards if transaction verification is used + * @param preAppendErrors the mapping from topic partition to LogAppendResult for errors that occurred before appending Review Comment: Oh I see what you are saying -- basically the appendCallback should be defined so the verification errors are joined whenever the callback is defined (ideally after we get the verification errors). So the callback for verification would include the definition of the append callback. I think this could work. Time to change all the mock code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1441066328 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig, * @param responseCallback callback for sending the response * @param delayedProduceLocklock for the delayed actions * @param recordValidationStatsCallback callback for updating stats on record conversions - * @param requestLocal container for the stateful instances scoped to this request - * @param transactionalId transactional ID if the request is from a producer and the producer is transactional + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. + * @param verificationGuardsthe mapping from topic partition to verification guards if transaction verification is used + * @param preAppendErrors the mapping from topic partition to LogAppendResult for errors that occurred before appending Review Comment: I was considering a write where some partitions were verified and some were not. We would allow the ones that succeeded to still write and not the ones that failed verification. Not sure if we support multi-partition writes in the produce api, but it appears we do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
hachikuji commented on code in PR #15087: URL: https://github.com/apache/kafka/pull/15087#discussion_r1441051073 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig, * @param responseCallback callback for sending the response * @param delayedProduceLocklock for the delayed actions * @param recordValidationStatsCallback callback for updating stats on record conversions - * @param requestLocal container for the stateful instances scoped to this request - * @param transactionalId transactional ID if the request is from a producer and the producer is transactional + * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the + * thread calling this method * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. + * @param verificationGuardsthe mapping from topic partition to verification guards if transaction verification is used + * @param preAppendErrors the mapping from topic partition to LogAppendResult for errors that occurred before appending Review Comment: This seems a little strange. If the partitions have already failed, then why do we need to pass them through `appendRecords`? I would expect instead that the caller would just join the pre-append failures with the append failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
tinaselenge commented on PR #14995: URL: https://github.com/apache/kafka/pull/14995#issuecomment-1876030053 > Thanks for the PR. I made a second quick pass and left a few more comments. @mimaison Thank you very much for reviewing the PR again. I think I addressed your comments except the one I had a question on. Please let me know if I missed anything. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
tinaselenge commented on PR #14995: URL: https://github.com/apache/kafka/pull/14995#issuecomment-1876028036 > I came up with another path traversal, but this one is less severe. > > If you try to get `/arbitrary/path/../..//dir/dirFile` you can perform "exists" and "isDirectory" checks for `/arbitrary/path` depending on the error returned. > > (edit): This attack isn't reproducible in the tests, because they mock out `reader(Path)` > > This would allow an attacker to map out the whole filesystem and permissions of the current process. This could inform another attack, such as: finding vulnerable software, finding user accounts, finding active processes, etc. > > This attack works because the normalized form is used in pathIsAllowed(), but the non-normalized form is used in the actual read. I think we need to make it so that the normalized path is used instead of the given path. > > I noticed this caveat in the documentation for Path.normalize: > > > This method does not access the file system; the path may not locate a file that exists. Eliminating ".." and a preceding name from a path may result in the path that locates a different file than the original path. This can arise when the preceding name is a symbolic link. > > I think this would cause a backwards-incompatible change if we were to apply it in the default case, so we should only use the normalized form when `allowed.paths` is specified. > > I tried and failed to come up with a arbitrary file-read attack using the symlink behavior. If the symlink is within the allowed-paths (and possibly lets the user escape the allowed-paths) it should probably work normally. If the symlink is outside of the allowed paths, we can exploit it with the same prefixing attack from above, but the allowed part of the path prevents actually reading arbitrary files. Regardless, I think that using the normalized form for file accesses prevents accessing external symlinks completely. > > Thanks so much for working on this feature! Thank you very much @gharris1727 for the detailed comment on this issue. These are really good points and I agree, I hadn't put enough thoughts into this. I took the suggestions and updated it to use the normalisedPath in the actual read when `allowed.paths` is specified. Please let me know what you think. I also tried addressing your other comments. Thank you for reviewing the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
tinaselenge commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1440996476 ## clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java: ## @@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() { public static class TestFileConfigProvider extends FileConfigProvider { @Override -protected Reader reader(String path) throws IOException { +protected Reader reader(Path path) throws IOException { return new StringReader("testKey=testResult\ntestKey2=testResult2"); } } + +@Test +public void testAllowedDirPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(dirFile); +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testAllowedFilePath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(dirFile); +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testMultipleAllowedPaths() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir); +configProvider.configure(configs); + +Map result = new HashMap<>(); +result.put("testKey", "testResult"); +result.put("testKey2", "testResult2"); + +ConfigData configData = configProvider.get(dirFile); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); + +configData = configProvider.get(siblingDirFile); +assertEquals(result, configData.data()); +assertNull(configData.ttl()); +} + +@Test +public void testNotAllowedDirPath() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dir); +configProvider.configure(configs); + +ConfigData configData = configProvider.get(siblingDirFile); +assertTrue(configData.data().isEmpty()); +assertNull(configData.ttl()); +} + +@Test +public void testNotAllowedFilePath() throws IOException { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +//another file under the same directory +Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2")); +ConfigData configData = configProvider.get(dirFile2.toString()); +assertTrue(configData.data().isEmpty()); +assertNull(configData.ttl()); +} + +@Test +public void testNoTraversal() { +Map configs = new HashMap<>(); +configs.put(ALLOWED_PATHS_CONFIG, dirFile); +configProvider.configure(configs); + +// Check we can't escape outside the path directory +ConfigData configData = configProvider.get(dirFile + Paths.get("/../siblingdir/siblingdirFile")); Review Comment: good point! I fixed the path logic, however I'm not sure about deduplicating the test classes as they are set up slightly different, one working with directory and the one working with files. I did however deduplicate the implementation classes by adding ConfigProviderUtils class with the common methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
tinaselenge commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1440994447 ## clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java: ## @@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider { private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class); +public static final String ALLOWED_PATHS_CONFIG = "allowed.paths"; +public static final String ALLOWED_PATHS_DOC = "Path that this config provider is allowed to access"; +private List allowedPaths; + public void configure(Map configs) { +if (configs.containsKey(ALLOWED_PATHS_CONFIG)) { +String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG); + +if (configValue != null && !configValue.isEmpty()) { Review Comment: If the user configure the config with null or empty value, should that be considered as a bad value or should that be considered same as not setting it, therefore allowing access to all paths? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fixed typos in docker readme documentation [kafka]
kumarlokesh opened a new pull request, #15120: URL: https://github.com/apache/kafka/pull/15120 Fixed minor typos in `docker/README` documentation. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16078 IBP defaults to latest production MetadataVersion [kafka]
mumrah commented on PR #15118: URL: https://github.com/apache/kafka/pull/15118#issuecomment-1875988796 All production usages now call `latestProduction` and test usages (including benchmark classes) call `latestTesting. One exception is is `QuorumFeatures#defaultFeatureMap(boolean enableUnstable)` which calls either depending on its argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
gharris1727 commented on code in PR #15101: URL: https://github.com/apache/kafka/pull/15101#discussion_r1440917179 ## core/src/test/java/kafka/test/junit/LeakTestingExtension.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.junit; + +import kafka.utils.TestUtils; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ExtensionContext.Namespace; +import org.junit.jupiter.api.extension.ExtensionContext.Store; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import scala.Tuple2; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { +private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( Review Comment: Oh I didn't notice that this is a set of threads that are allowed to leak, where the original TestUtils.verifyNoUnexpectedThreads was a set of threads which weren't allowed to leak. I assumed the mechanism was copy-pasted. I don't know if this set needs to exist, especially now that the assertion is stateful. Perhaps we can empty this set completely to see what the effect is, and then add in what is necessary. I did see some flakiness with an early prototype of #14783 which sometimes detected the gradle runner opening sockets. Perhaps there are similar things which create background threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
divijvaidya commented on code in PR #15101: URL: https://github.com/apache/kafka/pull/15101#discussion_r1440893773 ## core/src/test/java/kafka/test/junit/LeakTestingExtension.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.junit; + +import kafka.utils.TestUtils; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ExtensionContext.Namespace; +import org.junit.jupiter.api.extension.ExtensionContext.Store; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import scala.Tuple2; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { +private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( Review Comment: Hy @gharris1727 @wernerdv Can you please help me understand why it is expected to have kafka-scheduler or ExpirationReaper or ReplicaFetcherThread at the end or beginning of test? Isn't presence of these threads an indication of a thread leak? (for example, expiration reaper threads may mean that we don't close controllerAPIs correctly) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16078 IBP defaults to latest production MetadataVersion [kafka]
cmccabe commented on PR #15118: URL: https://github.com/apache/kafka/pull/15118#issuecomment-1875906638 Alternately, if we wanted to make this literally impossible to invoke from a non-test context, we could move `MetadataVersion.latest` into `MetadataVersionTest.latest`. The downside is that that would require all the other test modules to depend on `:server-common:test` (although a lot of them already do?) That might extend the build time? Not sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16078 IBP defaults to latest production MetadataVersion [kafka]
cmccabe commented on PR #15118: URL: https://github.com/apache/kafka/pull/15118#issuecomment-1875904378 I agree with @ijuma that just having a boolean isn't very clear. How about renaming `MetadataVersion.latest()` to `MetadataVersion.latestTesting()`, and then creating a separate function `MetadataVersion.latestProduction()`? That seems like the clearest we're going to get with this. Devs won't want to invoke something with "testing" in the name in a non-test context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16078 IBP defaults to latest production MetadataVersion [kafka]
ijuma commented on code in PR #15118: URL: https://github.com/apache/kafka/pull/15118#discussion_r1440881725 ## core/src/main/java/kafka/server/builders/LogManagerBuilder.java: ## @@ -48,7 +48,7 @@ public class LogManagerBuilder { private int maxTransactionTimeoutMs = 15 * 60 * 1000; private ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(6, false); private int producerIdExpirationCheckIntervalMs = 60; -private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest(); +private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest(true); Review Comment: An enum instead of boolean would make this a lot more readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Move Raft io thread implementation to Java [kafka]
hachikuji opened a new pull request, #15119: URL: https://github.com/apache/kafka/pull/15119 This patch moves the `RaftIOThread` implementation into Java. I changed the name to `KafkaRaftClientDriver` since the main thing it does is drive the calls to `poll()`. There shouldn't be any changes to the logic. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16078 IBP defaults to latest production MetadataVersion [kafka]
mumrah opened a new pull request, #15118: URL: https://github.com/apache/kafka/pull/15118 This patch introduces a boolean flag to MetadataVersion#latest which controls whether or not "unstable" MetadataVersions can be returned. For testing purposes, we want to be able to automatically pick up the latest version regardless of stability. For production, we should only be using LATEST_PRODUCTION. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
philipnee commented on PR #15000: URL: https://github.com/apache/kafka/pull/15000#issuecomment-1875876548 hi @lucasbru - i think we might need to loosen the thread access restriction for the interceptor because i think we also need to trigger interceptors during autocommit. There are 2 ways to achieve this: 1. pass an event to the main thread to execute the interceptor on poll: the interceptor might never get triggered 2. loosen the thread access restriction - I think we might need to do that to guarantee the interceptors can always be invoked #2 is a bit annoying because it against the contract as you mentioned in one of the comment. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup race on unassigned task directories [kafka]
sanepal commented on code in PR #15088: URL: https://github.com/apache/kafka/pull/15088#discussion_r1440860832 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1229,10 +1229,21 @@ private void tryToLockAllNonEmptyTaskDirectories() { final String namedTopology = taskDir.namedTopology(); try { final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology); -if (stateDirectory.lock(id)) { -lockedTaskDirectories.add(id); -if (!allTasks.containsKey(id)) { -log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id); +boolean lockedEmptyDirectory = false; +try { +if (stateDirectory.lock(id)) { +if (stateDirectory.directoryForTaskIsEmpty(id)) { +lockedEmptyDirectory = true; Review Comment: Thank you for the feedback! I updated to simplify the control flow and add a comment and log around the behavior. Please let me know how that looks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]
philipnee commented on PR #15020: URL: https://github.com/apache/kafka/pull/15020#issuecomment-1875867893 hi @Phuc-Hong-Tran - yes, the abstractFetch implementation is based on the LegacyKafkaConsumer and therefore requires connection probing. We don't need that in the AsyncKafkaConsumer as it is being done right before sending out the requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16078) InterBrokerProtocolVersion defaults to non-production MetadataVersion
[ https://issues.apache.org/jira/browse/KAFKA-16078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802308#comment-17802308 ] Justine Olshan commented on KAFKA-16078: We should really get to writing the KIP for the "non-production" vs "production" MVs. > InterBrokerProtocolVersion defaults to non-production MetadataVersion > - > > Key: KAFKA-16078 > URL: https://issues.apache.org/jira/browse/KAFKA-16078 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16078) InterBrokerProtocolVersion defaults to non-production MetadataVersion
David Arthur created KAFKA-16078: Summary: InterBrokerProtocolVersion defaults to non-production MetadataVersion Key: KAFKA-16078 URL: https://issues.apache.org/jira/browse/KAFKA-16078 Project: Kafka Issue Type: Bug Reporter: David Arthur Assignee: David Arthur -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16077: Streams fails to close task after restoration [kafka]
lucasbru opened a new pull request, #15117: URL: https://github.com/apache/kafka/pull/15117 Streams fails to close task after restoration when input partitions are updated in a new assignment happening at the same time. There is a race condition in the state updater that can cause the following: 1. We have an active task in the state updater 2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back 3. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition 4. The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten) 5. Now the task is in an initialized state, but the underlying producer does not have transactions initialized This can cause an IllegalStateException: `Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION` when running in EOSv2. To fix this, we introduce a new pending action CloseReviveAndUpdateInputPartitions that is added when we handle a new assignment with updated input partitions, but we still need to close the task before reopening it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16077) Streams fails to close task after restoration when input partitions are updated
[ https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16077: --- Summary: Streams fails to close task after restoration when input partitions are updated (was: State updater fails to close task when input partitions are updated) > Streams fails to close task after restoration when input partitions are > updated > --- > > Key: KAFKA-16077 > URL: https://issues.apache.org/jira/browse/KAFKA-16077 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Lucas Brutschy >Priority: Critical > > There is a race condition in the state updater that can cause the following: > # We have an active task in the state updater > # We get fenced. We recreate the producer, transactions now uninitialized. > We ask the state updater to give back the task, add a pending action to close > the task clean once it’s handed back > # We get a new assignment with updated input partitions. The task is still > owned by the state updater, so we ask the state updater again to hand it back > and add a pending action to update its input partition > # The task is handed back by the state updater. We update its input > partitions but forget to close it clean (pending action was overwritten) > # Now the task is in an initialized state, but the underlying producer does > not have transactions initialized > This can lead to an exception like this: > {code:java} > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException: > Exception caught in process. taskId=1_0, > processor=KSTREAM-SOURCE-05, topic=node-name-repartition, > partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: > TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: > Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) >
[jira] [Created] (KAFKA-16077) State updater fails to close task when input partitions are updated
Lucas Brutschy created KAFKA-16077: -- Summary: State updater fails to close task when input partitions are updated Key: KAFKA-16077 URL: https://issues.apache.org/jira/browse/KAFKA-16077 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.7.0 Reporter: Lucas Brutschy There is a race condition in the state updater that can cause the following: # We have an active task in the state updater # We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back # We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition # The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten) # Now the task is in an initialized state, but the underlying producer does not have transactions initialized This can lead to an exception like this: streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-05, topic=node-name-repartition, partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
[jira] [Updated] (KAFKA-16077) State updater fails to close task when input partitions are updated
[ https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16077: --- Description: There is a race condition in the state updater that can cause the following: # We have an active task in the state updater # We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back # We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition # The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten) # Now the task is in an initialized state, but the underlying producer does not have transactions initialized This can lead to an exception like this: {code:java} streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-05, topic=node-name-repartition, partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:847) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
Re: [PR] KAFKA-16070: Extract the setReadOnly method into Headers [kafka]
hachikuji commented on PR #15113: URL: https://github.com/apache/kafka/pull/15113#issuecomment-1875758227 Closing as a duplicate of #15097 . Please reopen if mistaken. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16070: Extract the setReadOnly method into Headers [kafka]
hachikuji closed pull request #15113: KAFKA-16070: Extract the setReadOnly method into Headers URL: https://github.com/apache/kafka/pull/15113 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16070: move setReadOnly to Headers [kafka]
hachikuji commented on code in PR #15097: URL: https://github.com/apache/kafka/pull/15097#discussion_r1440744523 ## clients/src/main/java/org/apache/kafka/common/header/Headers.java: ## @@ -69,4 +69,10 @@ public interface Headers extends Iterable { */ Header[] toArray(); +/** + * Set Header to readonly. + */ +default void setReadOnly() { Review Comment: Hmm, I don't think `setReadOnly` was ever intended to be public. The class `RecordHeaders` is in `internals`. We just use this method as a way to prevent modification once the headers have been passed to the producer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add reviewers GitHub action [kafka]
divijvaidya commented on code in PR #15115: URL: https://github.com/apache/kafka/pull/15115#discussion_r1440702318 ## .github/workflows/pr_reviews.yml: ## @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Adding Reviewers + +on: + pull_request_review: +types: [submitted] + +jobs: + add_reviewers: +runs-on: ubuntu-latest +steps: +- uses: actions/checkout@v3 +- name: Add Reviewers + run: | +user_json=$(gh api users/${{ github.event.review.user.login }}) Review Comment: we should probably explicitly specify the `accept` in the header to `-H "Accept: application/vnd.github+json" \` as per https://docs.github.com/en/rest/users/users?apiVersion=2022-11-28 Also maybe the version to `-H "X-GitHub-Api-Version: 2022-11-28" \` ## .github/workflows/pr_reviews.yml: ## @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Adding Reviewers + +on: + pull_request_review: +types: [submitted] + +jobs: + add_reviewers: +runs-on: ubuntu-latest +steps: +- uses: actions/checkout@v3 +- name: Add Reviewers + run: | +user_json=$(gh api users/${{ github.event.review.user.login }}) +user_name=$(echo "$user_json" | jq -r '.name') +user_email=$(echo "$user_json" | jq -r '.email') +if [[ "${{ github.event.pull_request.body }}" =~ ^.*Reviewers:\ .*${user_name}.*$ ]]; then Review Comment: nit You can extract out pull_request.body into a variable since it's used multiple times. The code becomes ``` run: | user_json=$(gh api users/${{ github.event.review.user.login }}) user_name=$(echo "$user_json" | jq -r '.name') user_email=$(echo "$user_json" | jq -r '.email') pr_body="${{ github.event.pull_request.body }}" if [[ "$pr_body" =~ ^.*Reviewers:\ .*${user_name}.*$ ]]; then echo "Reviewer already added: ${user_name} <${user_email}>" else if [[ "$pr_body" =~ ^.*Reviewers:\ .*$ ]]; then pr_body+=", ${user_name} <${user_email}>" else pr_body+="Reviewers: ${user_name} <${user_email}>" fi gh pr edit ${{ github.event.pull_request.number }} --body "$pr_body" echo "Added reviewer: ${user_name} <${user_email}>" fi ``` ## .github/workflows/pr_reviews.yml: ## @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Adding Reviewers + +on: + pull_request_review: +types: [submitted] + +jobs: + add_reviewers: +runs-on: ubuntu-latest +steps: +- uses: actions/checkout@v3 +- name: Add Reviewers + run: | +user_json=$(gh api users/${{ github.event.review.user.login }}) +
Re: [PR] MINOR: Bump year to 2024 in NOTICE file [kafka]
stanislavkozlovski commented on PR #15111: URL: https://github.com/apache/kafka/pull/15111#issuecomment-1875709048 cherry-picked to 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Bump year to 2024 in NOTICE file [kafka]
stanislavkozlovski merged PR #15111: URL: https://github.com/apache/kafka/pull/15111 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add reviewers GitHub action [kafka]
mumrah commented on PR #15115: URL: https://github.com/apache/kafka/pull/15115#issuecomment-1875698491 Thanks @viktorsomogyi, this is a nice little automation. Does this script edit the commit or just the PR body? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15742) KRaft support in GroupCoordinatorIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Werner updated KAFKA-15742: -- Fix Version/s: 3.8.0 > KRaft support in GroupCoordinatorIntegrationTest > > > Key: KAFKA-15742 > URL: https://issues.apache.org/jira/browse/KAFKA-15742 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Dmitry Werner >Priority: Minor > Labels: kraft, kraft-test, newbie > Fix For: 3.8.0 > > > The following tests in GroupCoordinatorIntegrationTest in > core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala > need to be updated to support KRaft > 41 : def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = > { > Scanned 63 lines. Found 0 KRaft tests out of 1 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15742: KRaft support in GroupCoordinatorIntegrationTest [kafka]
jolshan merged PR #15086: URL: https://github.com/apache/kafka/pull/15086 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
gharris1727 commented on PR #15101: URL: https://github.com/apache/kafka/pull/15101#issuecomment-1875661177 > It might be worth adding more names to the set of expected thread names. I think increasing the scope of this assertion by adding patterns/removing the pattern check/applying to tests outside of core are good ideas. We can land this change first and then explore those in follow-ups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
gharris1727 commented on code in PR #15101: URL: https://github.com/apache/kafka/pull/15101#discussion_r1440661862 ## core/src/test/java/kafka/test/junit/LeakTestingExtension.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.junit; + +import kafka.utils.TestUtils; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import scala.Tuple2; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { +private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( +Arrays.asList("junit-", "JMX", "feature-zk-node-event-process-thread", "ForkJoinPool", "executor-", +"kafka-scheduler-", "metrics-meter-tick-thread", "ReplicaFetcherThread", "scala-", "pool-") +); +private Set initialThreads; Review Comment: I think using a state variable here is essentially a `static` variable, and is shared between tests running concurrently. We don't run concurrent tests in the same JVM by default, but it is easy to change that with configuration. Can you use the extensionContext to store the state to avoid this breaking in the future? https://junit.org/junit5/docs/current/user-guide/#extensions-keeping-state -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
gharris1727 commented on PR #15080: URL: https://github.com/apache/kafka/pull/15080#issuecomment-1875639893 > just invoking methods in order is not enough to trigger the deadlock. I believe it is possible to reliably reproduce the deadlock with two countdown latches, one countdown in the ConfigBackingStore#snapshot and another in ConfigBackingStore#putTaskConfigs. This requires a mock for the config backing store. If you have a better idea I am happy to analyze it. Yeah I understand. I think the cost of deterministically reproducing the deadlock is too high. I did it in #8259 because I didn't know what synchronization was missing and needed a repro case to debug. I would be satisfied with a test which non-deterministically reproduces the deadlock but is less brittle and includes less mocks. Currently we only have two connectors calling task reconfiguration (mirror checkpoint and source) and one test in the DistributedHerder. There is zero coverage in StandaloneHerder, which is part of why we never found this bug :) > Unrelated to this PR's issue, it may be that the wait operations in StandaloneHerderTest are by mistake 1000 seconds instead of milliseconds. Isn't it? Yeah that timeout seems a bit absurd, but if there aren't deadlocks in the test it should never incur that timeout. It looks like the test suite is very well behaved in practice, so I'm inclined to keep it as-is: https://ge.apache.org/scans/tests?search.names=Git%20branch=P90D=kafka=America%2FLos_Angeles=trunk=*StandaloneHerderTest=FLAKY -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?
[ https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802246#comment-17802246 ] Hugo Abreu commented on KAFKA-10875: Hello! After some digging around the code it seems that this behaviour is expected in topics / partitions with low amount of data flowing. !image-2024-01-03-16-14-10-246.png|width=867,height=205! [~gongyifei] , can this be what you are experiencing? As in, the second search, happened to have entered a new message that complies with the search. > offsetsForTimes returns null for some partitions when it shouldn't? > --- > > Key: KAFKA-10875 > URL: https://issues.apache.org/jira/browse/KAFKA-10875 > Project: Kafka > Issue Type: Bug >Reporter: Yifei Gong >Priority: Minor > Attachments: image-2024-01-03-16-14-10-246.png > > > I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1 > I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am > trying to seek to offsets after certain timestamp inside > {{onPartitionsAssigned}} method by calling {{offsetsForTimes}}. > I found this strange behavior of method {{offsetsForTimes}}: > When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020 > 5:06:55.534 AM) like below: > {code:java} > @Override > public void onPartitionsAssigned(Consumer consumer, > Collection partitions) { > // calling assignment just to ensure my consumer is actually assigned the > partitions > Set tps = consumer.assignment(); > Map offsetsForTimes = new HashMap<>(); > offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream() > .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L; > } > {code} > By setting breakpoint, I can see I got below map: > {noformat} > {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} > "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)" > {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} > "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)" > {TopicPartition@5498} "My.Data.Topic-5" -> null > {TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501} > "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)" > {TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504} > "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)" > {TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507} > "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)" > {TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510} > "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)" > {TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513} > "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)" > {TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516} > "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)" > {TopicPartition@5518} "My.Data.Topic-6" -> null > {noformat} > As you can see some of the partitions (5 and 6) it returned null. > However, if I seek a more recent timestamp like {{1607941818423L}} (GMT > December 14, 2020 10:30:18.423 AM), I got offsets for all partitions: > {noformat} > {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} > "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)" > {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} > "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)" > {TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499} > "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)" > {TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502} > "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)" > {TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505} > "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)" > {TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508} > "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)" > {TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511} > "(timestamp=1607941818517, leaderEpoch=298, offset=22082849)" > {TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514} > "(timestamp=1607941818423, leaderEpoch=332, offset=23491462)" > {TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517} > "(timestamp=1607942934371, leaderEpoch=285, offset=22306422)" > {TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520} > "(timestamp=1607941818424, leaderEpoch=317, offset=24677423)" > {noformat} > So I am confused why seeking to an older timestamp gave me nulls when there > are indeed messages with later timestamp as I tried the 2nd time? -- This message was sent by Atlassian Jira
[jira] [Updated] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?
[ https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hugo Abreu updated KAFKA-10875: --- Attachment: image-2024-01-03-16-14-10-246.png > offsetsForTimes returns null for some partitions when it shouldn't? > --- > > Key: KAFKA-10875 > URL: https://issues.apache.org/jira/browse/KAFKA-10875 > Project: Kafka > Issue Type: Bug >Reporter: Yifei Gong >Priority: Minor > Attachments: image-2024-01-03-16-14-10-246.png > > > I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1 > I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am > trying to seek to offsets after certain timestamp inside > {{onPartitionsAssigned}} method by calling {{offsetsForTimes}}. > I found this strange behavior of method {{offsetsForTimes}}: > When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020 > 5:06:55.534 AM) like below: > {code:java} > @Override > public void onPartitionsAssigned(Consumer consumer, > Collection partitions) { > // calling assignment just to ensure my consumer is actually assigned the > partitions > Set tps = consumer.assignment(); > Map offsetsForTimes = new HashMap<>(); > offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream() > .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L; > } > {code} > By setting breakpoint, I can see I got below map: > {noformat} > {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} > "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)" > {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} > "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)" > {TopicPartition@5498} "My.Data.Topic-5" -> null > {TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501} > "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)" > {TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504} > "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)" > {TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507} > "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)" > {TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510} > "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)" > {TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513} > "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)" > {TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516} > "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)" > {TopicPartition@5518} "My.Data.Topic-6" -> null > {noformat} > As you can see some of the partitions (5 and 6) it returned null. > However, if I seek a more recent timestamp like {{1607941818423L}} (GMT > December 14, 2020 10:30:18.423 AM), I got offsets for all partitions: > {noformat} > {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} > "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)" > {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} > "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)" > {TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499} > "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)" > {TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502} > "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)" > {TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505} > "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)" > {TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508} > "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)" > {TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511} > "(timestamp=1607941818517, leaderEpoch=298, offset=22082849)" > {TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514} > "(timestamp=1607941818423, leaderEpoch=332, offset=23491462)" > {TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517} > "(timestamp=1607942934371, leaderEpoch=285, offset=22306422)" > {TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520} > "(timestamp=1607941818424, leaderEpoch=317, offset=24677423)" > {noformat} > So I am confused why seeking to an older timestamp gave me nulls when there > are indeed messages with later timestamp as I tried the 2nd time? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add reviewers GitHub action [kafka]
viktorsomogyi commented on PR #15115: URL: https://github.com/apache/kafka/pull/15115#issuecomment-1875609641 One more thing is that the current script places the "Reviewers:" section after "Committer checklist" so it's not automatically included in the merge template. I'll see if that can be improved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15147) Measure pending and outstanding Remote Segment operations
[ https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802238#comment-17802238 ] Christo Lolov commented on KAFKA-15147: --- Heya [~enether], after a very quick search the metric documentation appears to actually be in the kafka codebase itself so I will open a pull request against the 3.7 branch tomorrow morning and tag you as a reviewer! > Measure pending and outstanding Remote Segment operations > - > > Key: KAFKA-15147 > URL: https://issues.apache.org/jira/browse/KAFKA-15147 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Upload+and+delete+lag+metrics+in+Tiered+Storage > > KAFKA-15833: RemoteCopyLagBytes > KAFKA-16002: RemoteCopyLagSegments, RemoteDeleteLagBytes, > RemoteDeleteLagSegments > KAFKA-16013: ExpiresPerSec > KAFKA-16014: RemoteLogSizeComputationTime, RemoteLogSizeBytes, > RemoteLogMetadataCount > KAFKA-15158: RemoteDeleteRequestsPerSec, RemoteDeleteErrorsPerSec, > BuildRemoteLogAuxStateRequestsPerSec, BuildRemoteLogAuxStateErrorsPerSec > > Remote Log Segment operations (copy/delete) are executed by the Remote > Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default > TopicBasedRLMM writes to the internal Kafka topic state changes on remote log > segments). > As executions run, fail, and retry; it will be important to know how many > operations are pending and outstanding over time to alert operators. > Pending operations are not enough to alert, as values can oscillate closer to > zero. An additional condition needs to apply (running time > threshold) to > consider an operation outstanding. > Proposal: > RemoteLogManager could be extended with 2 concurrent maps > (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure > segmentId time when operation started, and based on this expose 2 metrics per > operation: > * pendingSegmentCopies: gauge of pendingSegmentCopies map > * outstandingSegmentCopies: loop over pending ops, and if now - startedTime > > timeout, then outstanding++ (maybe on debug level?) > Is this a valuable metric to add to Tiered Storage? or better to solve on a > custom RLMM implementation? > Also, does it require a KIP? > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]
clolov commented on code in PR #15116: URL: https://github.com/apache/kafka/pull/15116#discussion_r1440615357 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -453,17 +456,17 @@ private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled) { @Test public void shouldPollWithRightTimeoutWithStateUpdaterDefault() { setupStateManagerMock(); +setupStoreMetadata(); final Properties properties = new Properties(); shouldPollWithRightTimeout(properties); } private void shouldPollWithRightTimeout(final Properties properties) { final TaskId taskId = new TaskId(0, 0); - EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); -EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); Review Comment: Ditto ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -257,15 +255,16 @@ public void shouldSupportUnregisterChangelogBeforeInitialization() { @Test public void shouldSupportUnregisterChangelogBeforeCompletion() { setupStateManagerMock(); +setupStoreMetadata(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); -EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); -EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); +when(storeMetadata.offset()).thenReturn(9L); if (type == STANDBY) { +when(storeMetadata.endOffset()).thenReturn(10L); Review Comment: Only used on the STANDBY path ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -821,11 +831,11 @@ public Map committed(final Set mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); -EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); Review Comment: Reported as unused by Mockito ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -363,12 +363,12 @@ public void shouldSupportUnregisterChangelogAfterCompletion() { @Test public void shouldInitializeChangelogAndCheckForCompletion() { setupStateManagerMock(); +setupStoreMetadata(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); -EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); -EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); Review Comment: Unused according to Mockito ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -862,10 +873,9 @@ public void shouldRequestCommittedOffsetsAndHandleTimeoutException() { EasyMock.expectLastCall(); when(stateManager.changelogAsSource(tp)).thenReturn(true); -EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); -EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); Review Comment: Ditto ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -924,13 +934,13 @@ public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(fina @Test public void shouldThrowIfCommittedOffsetsFail() { setupStateManagerMock(); +when(storeMetadata.changelogPartition()).thenReturn(tp); final TaskId taskId = new TaskId(0, 0); when(stateManager.taskId()).thenReturn(taskId); when(stateManager.changelogAsSource(tp)).thenReturn(true); -EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes(); Review Comment: Ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito [kafka]
clolov opened a new pull request, #15116: URL: https://github.com/apache/kafka/pull/15116 This pull request takes a similar approach to how TaskManagerTest is being migrated to Mockito mock by mock for easier reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add reviewers GitHub action [kafka]
viktorsomogyi commented on PR #15115: URL: https://github.com/apache/kafka/pull/15115#issuecomment-1875576165 One caveat with the "Name \" format is that \<\> is markup syntax and means superscript. This doesn't work however in case of emails, so Github will just display "Name name@email", so without \<\>. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add reviewers GitHub action [kafka]
viktorsomogyi opened a new pull request, #15115: URL: https://github.com/apache/kafka/pull/15115 This PR adds a github action that is triggered when a GitHub review is submitted. The action does the following: - if there is no "Reviewers" line at the end of the description, then appends it with the current reviewer's name and email that is set in their GitHub account. - if there is a "Reviewers" line at the end, then appends the reviewer's name and email. It uses the "Name \" format. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Test2 [kafka]
viktorsomogyi closed pull request #15114: Test2 URL: https://github.com/apache/kafka/pull/15114 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Test2 [kafka]
viktorsomogyi commented on PR #15114: URL: https://github.com/apache/kafka/pull/15114#issuecomment-1875553120 Meant to create this in my repo. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Test2 [kafka]
viktorsomogyi opened a new pull request, #15114: URL: https://github.com/apache/kafka/pull/15114 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate consumer mock in TaskManagerTest to Mockito [kafka]
clolov commented on code in PR #15112: URL: https://github.com/apache/kafka/pull/15112#discussion_r1440569384 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -197,6 +196,8 @@ public class TaskManagerTest { @Mock(type = MockType.STRICT) private Consumer consumer; @org.mockito.Mock +private Consumer mockitoConsumer; Review Comment: I ran into quite a lot of problems when I tried migrating the whole mock, so I decided to do the migration test-by-test. This way problems could be flushed out. By introducing this mock and using `setMainConsumer` on a test-by-test basis things are easier to go through (at least in my opinion ) ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -4809,11 +4768,14 @@ public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() { exception.corruptedTasks(), equalTo(Collections.singleton(taskId00)) ); + +Mockito.verify(mockitoConsumer, times(2)).groupMetadata(); Review Comment: Mockito claims the method is called twice. This wasn't specified in EasyMock world, but I decided to make it explicit now. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -3010,13 +2993,9 @@ public void initializeIfNeeded() { } }; -consumer.commitSync(Collections.emptyMap()); -expectLastCall(); -expect(consumer.assignment()).andReturn(emptySet()); -consumer.resume(eq(emptySet())); -expectLastCall(); Review Comment: Same as above, Mockito reported these are unused. I added an assertion in Mockito world at the end (...I should probably add one in the first test as well, but I can do this in part 2 of this pull request) ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1168,13 +1166,10 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { when(stateUpdater.drainRemovedTasks()) .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); -when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, consumer)) +when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, mockitoConsumer)) .thenReturn(convertedTask1); when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions)) .thenReturn(convertedTask0); -expect(consumer.assignment()).andReturn(emptySet()).anyTimes(); -consumer.resume(anyObject()); -expectLastCall().anyTimes(); Review Comment: According to Mockito these were unused. I deleted them in EasyMock world and the test still passed. Hence I removed them. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -3047,14 +3027,9 @@ public void completeRestoration(final java.util.function.Consumer> assignment = singletonMap(taskId00, taskId00Partitions); final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); +taskManager.setMainConsumer(mockitoConsumer); + // `handleAssignment` when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00)); -// `tryToCompleteRestoration` -expect(consumer.assignment()).andReturn(emptySet()); -consumer.resume(eq(emptySet())); -expectLastCall(); - -// `shutdown` -consumer.commitSync(Collections.emptyMap()); -expectLastCall(); Review Comment: Ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16070: extract the setReadOnly method into Headers [kafka]
Joker-5 opened a new pull request, #15113: URL: https://github.com/apache/kafka/pull/15113 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* TODO unit tests. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14133: Migrate consumer mock in TaskManagerTest to Mockito [kafka]
clolov opened a new pull request, #15112: URL: https://github.com/apache/kafka/pull/15112 This pull request migrates the consumer mock in TaskManagerTest test by test for easier reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Prevent java.lang.UnsupportedOperationException in MockAdminClient [kafka]
jamespfaulkner commented on PR #14955: URL: https://github.com/apache/kafka/pull/14955#issuecomment-1875517539 > Looks good! Out of curiosity, do you happen to know tests which are failing with this UnsupportedOperationException? Ah, sorry I missed this question first time round. I have a small app that constantly polls the `describeLogDirs`. When I saw the `MockAdminClient` had changed it's implementation of `describeLogDirs` to no longer immediately throw a `UnsupportedOperationException` I took the opportunity to improve my tests and spotted this . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]
divijvaidya commented on PR #15109: URL: https://github.com/apache/kafka/pull/15109#issuecomment-1875475685 https://github.com/apache/beam/pull/27015 is not Apache Beam enabled this. Seems like we need to request a build node from Apache Infra and change our jenkins to pick up creds similar to how beam did it (from env vars which are deployed by infra). I have created a ticket to talk to Infra at https://issues.apache.org/jira/browse/INFRA-25336 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]
Phuc-Hong-Tran commented on PR #15020: URL: https://github.com/apache/kafka/pull/15020#issuecomment-1875430964 @philipnee, when you said I should "rethink" about the approach, did you mean I should change the approach or I can just fix the code where you commented? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]
nicktelford commented on PR #15109: URL: https://github.com/apache/kafka/pull/15109#issuecomment-1875405401 Looks like Jenkins doesn't have permission (by default at least) to write entries to the ASF Gradle Enterprise Build Cache: ``` Could not store entry 744bd685bdd1d149ec93934b276b7698 in remote build cache: Storing entry at 'https://ge.apache.org/cache/744bd685bdd1d149ec93934b276b7698' response status 403: Forbidden ``` It might require some explicit authentication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]
nicktelford commented on PR #15109: URL: https://github.com/apache/kafka/pull/15109#issuecomment-1875356669 @ijuma I've temporarily reconfigured it to enable the cache for this branch (`gradle-remote-build-cache`) instead of `trunk`, so we can test that it works as expected using the CI builds on this PR. Pushing to the cache from these builds should be fine, since there are no code changes, so the task outputs should be the same as in `trunk` anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]
nicktelford commented on PR #15105: URL: https://github.com/apache/kafka/pull/15105#issuecomment-1875335559 @lucasbru OK, my bad. It turns out I did a minor refactoring _after_ I ran the test suite yesterday that was so insignificant I didn't think I needed to run the tests again... Turns out I was wrong :see_no_evil: I've fixed the bug now and the tests pass locally for real now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?
[ https://issues.apache.org/jira/browse/KAFKA-15867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802158#comment-17802158 ] Phuc Hong Tran commented on KAFKA-15867: [~pnee] I happened to come across this comment: [https://github.com/apache/kafka/blob/60c445bdd51c608d1212f5cab83b65533739bd61/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java#L61] [~kirktrue], may you give some insights on why we don't want to propagate the error to the caller? Thanks > Should ConsumerNetworkThread wrap the exception and notify the polling thread? > -- > > Key: KAFKA-15867 > URL: https://issues.apache.org/jira/browse/KAFKA-15867 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The ConsumerNetworkThread runs a tight loop infinitely. However, when > encountering an unexpected exception, it logs an error and continues. > > I think this might not be ideal because user can run blind for a long time > before discovering there's something wrong with the code; so I believe we > should propagate the throwable back to the polling thread. > > cc [~lucasbru] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix toString method of IsolationLevel [kafka]
PzaThief commented on PR #14782: URL: https://github.com/apache/kafka/pull/14782#issuecomment-1875332142 > Build failed again -- can you maybe rebase this PR on `trunk` to ensure it's on the latest version. Sorry for missed it. Can you please retrigger it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
developster commented on PR #15080: URL: https://github.com/apache/kafka/pull/15080#issuecomment-1875320392 @gharris1727 , sure. I wrote the test but just invoking methods in order is not enough to trigger the deadlock. The requestTaskReconfiguration method is always executed before putConnectorConfig. I believe it is possible to reliably reproduce the deadlock with two countdown latches, one countdown in the `ConfigBackingStore#snapshot` and another in `ConfigBackingStore#putTaskConfigs`. This requires a mock for the config backing store. If you have a better idea I am happy to analyze it. Unrelated to this PR's issue, it may be that the wait operations in StandaloneHerderTest are by mistake 1000 seconds instead of milliseconds. Isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix toString method of IsolationLevel [kafka]
mjsax commented on PR #14782: URL: https://github.com/apache/kafka/pull/14782#issuecomment-1875305566 Build failed again -- can you maybe rebase this PR on `trunk` to ensure it's on the latest version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Prevent java.lang.UnsupportedOperationException in MockAdminClient [kafka]
vamossagar12 commented on PR #14955: URL: https://github.com/apache/kafka/pull/14955#issuecomment-1875226073 > Looks good! Out of curiosity, do you happen to know tests which are failing with this UnsupportedOperationException? yeah even I was interested to know this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]
viktorsomogyi commented on code in PR #15048: URL: https://github.com/apache/kafka/pull/15048#discussion_r1440302423 ## core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.docker + +import kafka.tools.StorageTool +import kafka.utils.Exit + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption} + +object KafkaDockerWrapper { + def main(args: Array[String]): Unit = { +if (args.length == 0) { + throw new RuntimeException(s"Error: No operation input provided. " + +s"Please provide a valid operation: 'setup'.") +} +val operation = args.head +val arguments = args.tail + +operation match { + case "setup" => +if (arguments.length != 3) { + val errMsg = "not enough arguments passed. Usage: " + +"setup , " + System.err.println(errMsg) + Exit.exit(1, Some(errMsg)) +} +val defaultConfigsDir = arguments(0) +val mountedConfigsDir = arguments(1) +val finalConfigsDir = arguments(2) Review Comment: Parsing these 3 arguments would also be a good opportunity to validate and convert these into `Path` objects. Using the API of `Path` also makes your code more robust down the line when you append the config file names in `prepareConfigs`. ## core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.docker + +import kafka.tools.StorageTool +import kafka.utils.Exit + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption} + +object KafkaDockerWrapper { + def main(args: Array[String]): Unit = { +if (args.length == 0) { + throw new RuntimeException(s"Error: No operation input provided. " + +s"Please provide a valid operation: 'setup'.") +} +val operation = args.head +val arguments = args.tail + +operation match { + case "setup" => +if (arguments.length != 3) { + val errMsg = "not enough arguments passed. Usage: " + +"setup , " + System.err.println(errMsg) + Exit.exit(1, Some(errMsg)) +} +val defaultConfigsDir = arguments(0) +val mountedConfigsDir = arguments(1) +val finalConfigsDir = arguments(2) +try { + prepareConfigs(defaultConfigsDir, mountedConfigsDir, finalConfigsDir) +} catch { + case e: Throwable => +val errMsg = s"error while preparing configs: ${e.getMessage}" +System.err.println(errMsg) +Exit.exit(1, Some(errMsg)) +} + +val formatCmd = formatStorageCmd(finalConfigsDir, envVars) +StorageTool.main(formatCmd) + case _ => +throw new RuntimeException(s"Unknown operation $operation. " + + s"Please provide a valid operation: 'setup'.") +} + } + + import Constants._ + + private def formatStorageCmd(configsDir: String, env: Map[String, String]): Array[String] = { Review Comment: While I agree that it works, it's not a nice solution. Would it be a big refactor to use a specific method from `StorageTool` instead of `main`? ## core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala: ## @@ -0,0 +1,218 @@
Re: [PR] MINOR: Improve code style about producer [kafka]
divijvaidya merged PR #15107: URL: https://github.com/apache/kafka/pull/15107 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15816: Fix leaked sockets in trogdor tests [kafka]
divijvaidya merged PR #14771: URL: https://github.com/apache/kafka/pull/14771 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15816: Fix leaked sockets in trogdor tests [kafka]
divijvaidya commented on PR #14771: URL: https://github.com/apache/kafka/pull/14771#issuecomment-1875169735 The test modified in this PR is successful - https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14771/4/testReport/org.apache.kafka.trogdor.agent/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1440285636 ## storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java: ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.List; + +public interface SegmentDeletionReason { Review Comment: `SegmentDeletionReason` has other implementations like `RetentionMsBreach`, `RetentionSizeBreach`, `StartOffsetBreach` which use `UnifiedLog`. We can take a relook at this when we refactor `UnifiedLog`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1440207191 ## storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java: ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.List; + +public interface SegmentDeletionReason { Review Comment: SegmentDeletionReason has other implementations like `RetentionMsBreach`, `RetentionSizeBreach`, `StartOffsetBreach` which use UnifiedLog. We can take a relook at this when we refactor UnifiedLog. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1440207191 ## storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentDeletionReason.java: ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.List; + +public interface SegmentDeletionReason { Review Comment: SegmentDeletionReason has other implementations like `RetentionMsBreach`, `RetentionSizeBreach`, `StartOffsetBreach` which use UnifiedLog. We can take a relook at this when we refactor UnifiedLog. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, * (if there is one). It returns true iff the segment is deletable. * @return the segments ready to be deleted */ - private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { -def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], upperBoundOffset: Long): Boolean = { - val allowDeletionDueToLogStartOffsetIncremented = nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset + private[log] def deletableSegments(predicate: (LogSegment, Optional[LogSegment]) => Boolean): Iterable[LogSegment] = { Review Comment: I thought these could be addressed when `UnifiedLog` is refactored and moved to storage module. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } localLog.checkIfMemoryMappedBufferClosed() // remove the segments for lookups -localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) +localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava, true, reason) Review Comment: I thought these could be addressed when `UnifiedLog` is refactored and moved to storage module. ## checkstyle/import-control-core.xml: ## @@ -37,6 +37,8 @@ + + Review Comment: Right, it is not needed. I guess it was needed when that class was kept inside the core module. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { Review Comment: I thought these could be addressed when `UnifiedLog` is refactored and moved to storage module. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging { def deleteProducerSnapshots(): Unit = { LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, -s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir") { +s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir", { snapshotsToDelete.foreach { snapshot => snapshot.deleteIfExists() } - } + return; Review Comment: It was expecting a return for inline declaration, it is throwing a type mismatch error without that. ## storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java: ## @@ -0,0 +1,1146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the
Re: [PR] MINOR: Increase parallelism for Jenkins [kafka]
divijvaidya commented on PR #15099: URL: https://github.com/apache/kafka/pull/15099#issuecomment-1875126583 > @divijvaidya did you accidentally include the KafkaApisTest change here? Hi @jolshan No, it was intentional since it wasn't merged to trunk at that time. Note that this is a draft PR at this stage. Surprisingly, we have more test failures when we increase parallelism for tests. I am trying to investigate that. I will tag you for review when it is ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802095#comment-17802095 ] hzh0425 edited comment on KAFKA-16073 at 1/3/24 9:54 AM: - Yes, this is another good solution, localLogStartOffset will always >= baseSegment.startOffset().[~satish.duggana] was (Author: JIRAUSER298236): Yes, this is another good solution, localLogStartOffset will always >= baseSegment.startOffset(). > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802095#comment-17802095 ] hzh0425 edited comment on KAFKA-16073 at 1/3/24 9:54 AM: - Yes, this is another good solution, localLogStartOffset will always >= baseSegment.startOffset(). was (Author: JIRAUSER298236): Yes, this is another good solution, localLogStartOffset will always >= baseSegment.baseOffset(). > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802095#comment-17802095 ] hzh0425 commented on KAFKA-16073: - Yes, this is another good solution, localLogStartOffset will always >= baseSegment.baseOffset(). > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802093#comment-17802093 ] Stanislav Kozlovski commented on KAFKA-16046: - resolving since it was merged! Thanks for the quick work! > Stream Stream Joins fail after restoration with deserialization exceptions > -- > > Key: KAFKA-16046 > URL: https://issues.apache.org/jira/browse/KAFKA-16046 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Almog Gavra >Assignee: Almog Gavra >Priority: Blocker > Labels: streams > Fix For: 3.7.0 > > > Before KIP-954, the `KStreamImplJoin` class would always create > non-timestamped persistent windowed stores. After that KIP, the default was > changed to create timestamped stores. This wasn't compatible because, during > restoration, timestamped stores have their changelog values transformed to > prepend the timestamp to the value. This caused serialization errors when > trying to read from the store because the deserializers did not expect the > timestamp to be prepended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-16046. - Resolution: Fixed > Stream Stream Joins fail after restoration with deserialization exceptions > -- > > Key: KAFKA-16046 > URL: https://issues.apache.org/jira/browse/KAFKA-16046 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Almog Gavra >Assignee: Almog Gavra >Priority: Blocker > Labels: streams > Fix For: 3.7.0 > > > Before KIP-954, the `KStreamImplJoin` class would always create > non-timestamped persistent windowed stores. After that KIP, the default was > changed to create timestamped stores. This wasn't compatible because, during > restoration, timestamped stores have their changelog values transformed to > prepend the timestamp to the value. This caused serialization errors when > trying to read from the store because the deserializers did not expect the > timestamp to be prepended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Bump year to 2024 in NOTICE file [kafka]
stanislavkozlovski commented on code in PR #15111: URL: https://github.com/apache/kafka/pull/15111#discussion_r1440235165 ## NOTICE: ## @@ -20,4 +20,4 @@ clients/src/main/java/org/apache/kafka/common/utils/PureJavaCrc32C.java Some portions of this file Copyright (c) 2004-2006 Intel Corporation and licensed under the BSD license. This project contains the following code copied from Apache Hive: -streams/src/main/java/org/apache/kafka/streams/state/internals/Murmur3.java Review Comment: this was added automatically by `vi`. I figure we leave the new line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Bump year to 2024 in NOTICE file [kafka]
stanislavkozlovski opened a new pull request, #15111: URL: https://github.com/apache/kafka/pull/15111 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]
lucasbru commented on PR #15105: URL: https://github.com/apache/kafka/pull/15105#issuecomment-1875033002 @nicktelford I'll rerun it. That being said, the last jobs on trunk have all finished within 3-5 hours, so this must be caused by either infrastructure or the code in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15344: Commit leader epoch where possible [kafka]
lucasbru commented on PR #14454: URL: https://github.com/apache/kafka/pull/14454#issuecomment-1875029211 @mjsax Could you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru commented on PR #15000: URL: https://github.com/apache/kafka/pull/15000#issuecomment-1875025923 @philipnee could you take a look please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]
nicktelford commented on PR #15105: URL: https://github.com/apache/kafka/pull/15105#issuecomment-1875025617 @lucasbru I think this is just the CI causing trouble again. The build passes locally. Is there a way to rerun the build without pushing more commits? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
wernerdv commented on PR #15101: URL: https://github.com/apache/kafka/pull/15101#issuecomment-1875025332 @gharris1727 @ashwinpankaj I've updated the utility to check for leaked threads relative to threads created before the test. This looks more correct. It might be worth adding more names to the list of expected thread names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]
lucasbru commented on PR #15105: URL: https://github.com/apache/kafka/pull/15105#issuecomment-1875023389 @nicktelford Seems like all build jobs timed out. Could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito [kafka]
lucasbru merged PR #15106: URL: https://github.com/apache/kafka/pull/15106 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org