Re: [PR] MINOR: cleanup warnings in Kafka Streams code base [kafka]
ableegoldman commented on code in PR #14549: URL: https://github.com/apache/kafka/pull/14549#discussion_r1359737490 ## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ## @@ -87,12 +87,12 @@ public StreamsBuilder() { */ @SuppressWarnings("this-escape") public StreamsBuilder(final TopologyConfig topologyConfigs) { -topology = getNewTopology(topologyConfigs); +topology = newTopology(topologyConfigs); internalTopologyBuilder = topology.internalTopologyBuilder; internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); } -protected Topology getNewTopology(final TopologyConfig topologyConfigs) { Review Comment: I mean...there are definitely some people using named topologies, though I've made sure they are aware it is not considered a public API and not intended for public use without a KIP Anyways given that this specific API is not even a public method I think it's fair game to remove. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14684 Replace EasyMock and PowerMock with Mockito in WorkerSinkTaskThreadedTest [kafka]
hgeraldino commented on code in PR #14505: URL: https://github.com/apache/kafka/pull/14505#discussion_r1359700014 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java: ## @@ -409,309 +430,266 @@ public void testAssignmentPauseResume() throws Exception { } sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2); return null; -}); -consumer.pause(Arrays.asList(UNASSIGNED_TOPIC_PARTITION)); -PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); -consumer.pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); -PowerMock.expectLastCall(); - -expectOnePoll().andAnswer(() -> { +}).doAnswer(invocation -> { try { sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION); fail("Trying to resume unassigned partition should have thrown an Connect exception"); } catch (ConnectException e) { // expected } - sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2); return null; -}); -consumer.resume(Arrays.asList(UNASSIGNED_TOPIC_PARTITION)); -PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); -consumer.resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); -PowerMock.expectLastCall(); +}).when(sinkTask).put(any(Collection.class)); -expectStopTask(); +doThrow(new IllegalStateException("unassigned topic partition")).when(consumer).pause(singletonList(UNASSIGNED_TOPIC_PARTITION)); +doAnswer(invocation -> null).when(consumer).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); -PowerMock.replayAll(); +doThrow(new IllegalStateException("unassigned topic partition")).when(consumer).resume(singletonList(UNASSIGNED_TOPIC_PARTITION)); +doAnswer(invocation -> null).when(consumer).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); +verifyInitializeTask(); + workerTask.iteration(); +verifyInitialAssignment(); + workerTask.iteration(); workerTask.iteration(); workerTask.iteration(); workerTask.stop(); workerTask.close(); +verifyStopTask(); +verifyTaskGetTopic(3); -PowerMock.verifyAll(); +verify(consumer, atLeastOnce()).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); +verify(consumer, atLeastOnce()).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)); } @Test -public void testRewind() throws Exception { -expectInitializeTask(); -expectTaskGetTopic(true); -expectPollInitialAssignment(); +public void testRewind() { +expectTaskGetTopic(); +expectInitialAssignment(); +expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); final long startOffset = 40L; final Map offsets = new HashMap<>(); -expectOnePoll().andAnswer(() -> { +doAnswer(invocation -> { +return null; // initial assignment +}).doAnswer(invocation -> { offsets.put(TOPIC_PARTITION, startOffset); sinkTaskContext.getValue().offset(offsets); return null; -}); - -consumer.seek(TOPIC_PARTITION, startOffset); -EasyMock.expectLastCall(); - -expectOnePoll().andAnswer(() -> { +}).doAnswer(invocation -> { Map offsets1 = sinkTaskContext.getValue().offsets(); assertEquals(0, offsets1.size()); return null; -}); - -expectStopTask(); -PowerMock.replayAll(); +}).when(sinkTask).put(any(Collection.class)); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); +verifyInitializeTask(); + workerTask.iteration(); +verifyInitialAssignment(); + workerTask.iteration(); workerTask.iteration(); +verify(consumer).seek(TOPIC_PARTITION, startOffset); + workerTask.stop(); workerTask.close(); - -PowerMock.verifyAll(); +verifyStopTask(); +verifyTaskGetTopic(2); } @Test -public void testRewindOnRebalanceDuringPoll() throws Exception { -expectInitializeTask(); -expectTaskGetTopic(true); -expectPollInitialAssignment(); -expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2); +public void testRewindOnRebalanceDuringPoll() { +final long startOffset = 40L; + +expectTaskGetTopic(); +expectInitialAssignment(); +expectRebalanceDuringPoll(startOffset); -expectRebalanceDuringPoll().andAnswer(() -> { +doA
Re: [PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]
guozhangwang commented on code in PR #14550: URL: https://github.com/apache/kafka/pull/14550#discussion_r1359692144 ## streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java: ## @@ -90,33 +104,43 @@ public static Branched withConsumer(final Consumer> c /** * Create an instance of {@code Branched} with provided chain function and branch name suffix. * - * @param chain A function that will be applied to the branch. If the provided function returns - * {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned - * by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see - * {@link BranchedKStream} description for details). - * @param name the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated - * (see {@link BranchedKStream} description for details) - * @paramkey type - * @paramvalue type + * @param chain + *A function that will be applied to the branch. If the provided function returns + *{@code null}, its result is ignored, otherwise it is added to the {@code Map} returned + *by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see + *{@link BranchedKStream} description for details). + * @param name + *the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated + *(see {@link BranchedKStream} description for details) + * + * @param key type + * @param value type + * * @return a new instance of {@code Branched} */ public static Branched withFunction( -final Function, ? extends KStream> chain, final String name) { +final Function, ? extends KStream> chain, Review Comment: I just personally feel a newline with just an extra param feels not very elegant. Again just my own thoughts, feel free to ignore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: cleanup warnings in Kafka Streams code base [kafka]
mjsax commented on code in PR #14549: URL: https://github.com/apache/kafka/pull/14549#discussion_r1359669572 ## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ## @@ -87,12 +87,12 @@ public StreamsBuilder() { */ @SuppressWarnings("this-escape") public StreamsBuilder(final TopologyConfig topologyConfigs) { -topology = getNewTopology(topologyConfigs); +topology = newTopology(topologyConfigs); internalTopologyBuilder = topology.internalTopologyBuilder; internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); } -protected Topology getNewTopology(final TopologyConfig topologyConfigs) { Review Comment: I could not find anything. Maybe @ableegoldman can comment? -- Also no big deal to leave this one. In the end, we plan to remove this experimental feature in the future anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]
mjsax commented on code in PR #14550: URL: https://github.com/apache/kafka/pull/14550#discussion_r1359669078 ## streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java: ## @@ -90,33 +104,43 @@ public static Branched withConsumer(final Consumer> c /** * Create an instance of {@code Branched} with provided chain function and branch name suffix. * - * @param chain A function that will be applied to the branch. If the provided function returns - * {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned - * by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see - * {@link BranchedKStream} description for details). - * @param name the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated - * (see {@link BranchedKStream} description for details) - * @paramkey type - * @paramvalue type + * @param chain + *A function that will be applied to the branch. If the provided function returns + *{@code null}, its result is ignored, otherwise it is added to the {@code Map} returned + *by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see + *{@link BranchedKStream} description for details). + * @param name + *the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated + *(see {@link BranchedKStream} description for details) + * + * @param key type + * @param value type + * * @return a new instance of {@code Branched} */ public static Branched withFunction( -final Function, ? extends KStream> chain, final String name) { +final Function, ? extends KStream> chain, Review Comment: Not necessary. It's my personal preferred style. Happy to change it back. Please let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]
mjsax commented on code in PR #14550: URL: https://github.com/apache/kafka/pull/14550#discussion_r1359668981 ## streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java: ## @@ -27,15 +27,18 @@ * * @param key type * @param value type + * * @see KStream#foreach(ForeachAction) */ public interface ForeachAction { /** * Perform an action for each record of a stream. * - * @param key the key of the record - * @param value the value of the record + * @param key Review Comment: It's not necessary but make JavaDocs easier to read in general -- and I like to have consistent formatting across the board -- would like to establish this new formatting if nobody objects. Especially when there is many parameters with name of different length, it cumbersome to get nice formatting -- adding the description in the next line avoid all those whitespaces (and if we rename a parameter it becomes a single line change and no reformatting is necessary) ## streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java: ## @@ -16,12 +16,12 @@ */ package org.apache.kafka.streams.kstream; - /** * The {@code Initializer} interface for creating an initial value in aggregations. * {@code Initializer} is used in combination with {@link Aggregator}. * - * @param aggregate value type + * @param aggregate value type Review Comment: Ups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]
mjsax commented on code in PR #14550: URL: https://github.com/apache/kafka/pull/14550#discussion_r1359668737 ## streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java: ## @@ -125,21 +149,14 @@ public static Branched withConsumer(final Consumer(name, null, chain); } -/** - * Create an instance of {@code Branched} from an existing instance. - * - * @param branched the instance of {@code Branched} to copy - */ -protected Branched(final Branched branched) { Review Comment: Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775325#comment-17775325 ] Matthias J. Sax commented on KAFKA-13152: - [~guozhang] would you have interest to help getting this into 3.7? It keeps slipping... :( > Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with > "{statestore.cache}/{input.buffer}.max.bytes" > - > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests
[ https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775323#comment-17775323 ] Matthias J. Sax commented on KAFKA-15594: - This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done after 3.6 is released – it's WIP. > Add 3.6.0 to streams upgrade/compatibility tests > > > Key: KAFKA-15594 > URL: https://issues.apache.org/jira/browse/KAFKA-15594 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Satish Duggana >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests
[ https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775324#comment-17775324 ] Matthias J. Sax commented on KAFKA-15594: - This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done after 3.6 is released – it's WIP. > Add 3.6.0 to streams upgrade/compatibility tests > > > Key: KAFKA-15594 > URL: https://issues.apache.org/jira/browse/KAFKA-15594 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Satish Duggana >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15593) Add 3.6.0 to broker/client upgrade/compatibility tests
[ https://issues.apache.org/jira/browse/KAFKA-15593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775322#comment-17775322 ] Matthias J. Sax commented on KAFKA-15593: - This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done after 3.6 is released – it's WIP. > Add 3.6.0 to broker/client upgrade/compatibility tests > -- > > Key: KAFKA-15593 > URL: https://issues.apache.org/jira/browse/KAFKA-15593 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15406) Add the ForwardingManager metrics from KIP-938
[ https://issues.apache.org/jira/browse/KAFKA-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775316#comment-17775316 ] Colin McCabe commented on KAFKA-15406: -- This feature is planned for 3.7, but not with high confidence right now. It might slip. We have to get JBOD in. > Add the ForwardingManager metrics from KIP-938 > -- > > Key: KAFKA-15406 > URL: https://issues.apache.org/jira/browse/KAFKA-15406 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.7.0 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: newbie, newbie++ > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: cleanup warnings in Kafka Streams code base [kafka]
guozhangwang commented on code in PR #14549: URL: https://github.com/apache/kafka/pull/14549#discussion_r1359654735 ## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ## @@ -87,12 +87,12 @@ public StreamsBuilder() { */ @SuppressWarnings("this-escape") public StreamsBuilder(final TopologyConfig topologyConfigs) { -topology = getNewTopology(topologyConfigs); +topology = newTopology(topologyConfigs); internalTopologyBuilder = topology.internalTopologyBuilder; internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); } -protected Topology getNewTopology(final TopologyConfig topologyConfigs) { Review Comment: Have we double check that there's no callers outside KS code (like in other repos) of this function? Just double checking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]
guozhangwang commented on code in PR #14550: URL: https://github.com/apache/kafka/pull/14550#discussion_r1359652648 ## streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java: ## @@ -27,15 +27,18 @@ * * @param key type * @param value type + * * @see KStream#foreach(ForeachAction) */ public interface ForeachAction { /** * Perform an action for each record of a stream. * - * @param key the key of the record - * @param value the value of the record + * @param key Review Comment: Is this newline necessary? ## streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java: ## @@ -125,21 +149,14 @@ public static Branched withConsumer(final Consumer(name, null, chain); } -/** - * Create an instance of {@code Branched} from an existing instance. - * - * @param branched the instance of {@code Branched} to copy - */ -protected Branched(final Branched branched) { Review Comment: I'm assuming this is just for moving the func around, and removing the javadoc for those protected functions? ## streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java: ## @@ -57,29 +64,36 @@ public static Branched as(final String name) { /** * Create an instance of {@code Branched} with provided chain function. * - * @param chain A function that will be applied to the branch. If the provided function returns - * {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned - * by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see - * {@link BranchedKStream} description for details). - * @paramkey type - * @paramvalue type + * @param chain + *A function that will be applied to the branch. If the provided function returns + *{@code null}, its result is ignored, otherwise it is added to the {@code Map} returned + *by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see + *{@link BranchedKStream} description for details). + * + * @param key type + * @param value type + * * @return a new instance of {@code Branched} */ public static Branched withFunction( -final Function, ? extends KStream> chain) { +final Function, ? extends KStream> chain Review Comment: Ditto here? ## streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java: ## @@ -16,12 +16,12 @@ */ package org.apache.kafka.streams.kstream; - /** * The {@code Initializer} interface for creating an initial value in aggregations. * {@code Initializer} is used in combination with {@link Aggregator}. * - * @param aggregate value type + * @param aggregate value type Review Comment: Agg? :) ## streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java: ## @@ -90,33 +104,43 @@ public static Branched withConsumer(final Consumer> c /** * Create an instance of {@code Branched} with provided chain function and branch name suffix. * - * @param chain A function that will be applied to the branch. If the provided function returns - * {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned - * by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see - * {@link BranchedKStream} description for details). - * @param name the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated - * (see {@link BranchedKStream} description for details) - * @paramkey type - * @paramvalue type + * @param chain + *A function that will be applied to the branch. If the provided function returns + *{@code null}, its result is ignored, otherwise it is added to the {@code Map} returned + *by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see + *{@link BranchedKStream} description for details). + * @param name + *the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated + *(see {@link BranchedKStream} description for details) + * + * @param key type + * @param value type + * * @return a new instance of {@code Branched} */ public static Branched withFunction( -final Function, ? extends KStream> chain, final String name) { +final Function, ? extends KStream> chain, Review Comment: Ditto here, I feel these newlines are not very necessary? -- This is an automated message from the Apache Git Service. To respond to the message,
[jira] [Assigned] (KAFKA-15147) Measure pending and outstanding Remote Segment operations
[ https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya reassigned KAFKA-15147: Assignee: Christo Lolov (was: Divij Vaidya) > Measure pending and outstanding Remote Segment operations > - > > Key: KAFKA-15147 > URL: https://issues.apache.org/jira/browse/KAFKA-15147 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > Remote Log Segment operations (copy/delete) are executed by the Remote > Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default > TopicBasedRLMM writes to the internal Kafka topic state changes on remote log > segments). > As executions run, fail, and retry; it will be important to know how many > operations are pending and outstanding over time to alert operators. > Pending operations are not enough to alert, as values can oscillate closer to > zero. An additional condition needs to apply (running time > threshold) to > consider an operation outstanding. > Proposal: > RemoteLogManager could be extended with 2 concurrent maps > (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure > segmentId time when operation started, and based on this expose 2 metrics per > operation: > * pendingSegmentCopies: gauge of pendingSegmentCopies map > * outstandingSegmentCopies: loop over pending ops, and if now - startedTime > > timeout, then outstanding++ (maybe on debug level?) > Is this a valuable metric to add to Tiered Storage? or better to solve on a > custom RLMM implementation? > Also, does it require a KIP? > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15249) Verify Connect test-plugins artifact is published to Maven Central
[ https://issues.apache.org/jira/browse/KAFKA-15249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-15249. --- Resolution: Done > Verify Connect test-plugins artifact is published to Maven Central > -- > > Key: KAFKA-15249 > URL: https://issues.apache.org/jira/browse/KAFKA-15249 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.6.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > > In KAFKA-14759 we created a separate {{connect/test-plugins}} module to store > all testing-only Connect plugins and removed those plugins from existing > Connect modules. > These testing-only plugins are intentionally excluded from the project's > release file (which can be generated with {{{}./gradlew releaseTarGz{}}}) > however, some users may still be relying on them for testing environments. > Although we should refrain from distributing these testing-only plugins with > our out-of-the-box distribution of Connect, we should still ensure that > they're available on an opt-in basis to users who would like to continue > using them. This can be accomplished by publishing them to [Maven > Central|https://search.maven.org/], like we do with our other modules. > This will probably happen automatically during the next release (3.6.0) with > no further action required. This ticket is just here as a reminder to verify > that the artifacts are present in the staging Maven repo when release > candidates are published for voting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15147) Measure pending and outstanding Remote Segment operations
[ https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775304#comment-17775304 ] Stanislav Kozlovski commented on KAFKA-15147: - thanks for the quick update [~divijvaidya] - adding it to the freshly-released release plan > Measure pending and outstanding Remote Segment operations > - > > Key: KAFKA-15147 > URL: https://issues.apache.org/jira/browse/KAFKA-15147 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Divij Vaidya >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > Remote Log Segment operations (copy/delete) are executed by the Remote > Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default > TopicBasedRLMM writes to the internal Kafka topic state changes on remote log > segments). > As executions run, fail, and retry; it will be important to know how many > operations are pending and outstanding over time to alert operators. > Pending operations are not enough to alert, as values can oscillate closer to > zero. An additional condition needs to apply (running time > threshold) to > consider an operation outstanding. > Proposal: > RemoteLogManager could be extended with 2 concurrent maps > (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure > segmentId time when operation started, and based on this expose 2 metrics per > operation: > * pendingSegmentCopies: gauge of pendingSegmentCopies map > * outstandingSegmentCopies: loop over pending ops, and if now - startedTime > > timeout, then outstanding++ (maybe on debug level?) > Is this a valuable metric to add to Tiered Storage? or better to solve on a > custom RLMM implementation? > Also, does it require a KIP? > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15147) Measure pending and outstanding Remote Segment operations
[ https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775303#comment-17775303 ] Divij Vaidya commented on KAFKA-15147: -- Hi [~enether] We have a KIP for this Jira which is under discussion. See: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Upload+and+delete+lag+metrics+in+Tiered+Storage] The KIP is targeted to land in 3.7 as part of TS productional readiness. > Measure pending and outstanding Remote Segment operations > - > > Key: KAFKA-15147 > URL: https://issues.apache.org/jira/browse/KAFKA-15147 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Divij Vaidya >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > Remote Log Segment operations (copy/delete) are executed by the Remote > Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default > TopicBasedRLMM writes to the internal Kafka topic state changes on remote log > segments). > As executions run, fail, and retry; it will be important to know how many > operations are pending and outstanding over time to alert operators. > Pending operations are not enough to alert, as values can oscillate closer to > zero. An additional condition needs to apply (running time > threshold) to > consider an operation outstanding. > Proposal: > RemoteLogManager could be extended with 2 concurrent maps > (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure > segmentId time when operation started, and based on this expose 2 metrics per > operation: > * pendingSegmentCopies: gauge of pendingSegmentCopies map > * outstandingSegmentCopies: loop over pending ops, and if now - startedTime > > timeout, then outstanding++ (maybe on debug level?) > Is this a valuable metric to add to Tiered Storage? or better to solve on a > custom RLMM implementation? > Also, does it require a KIP? > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14175) KRaft Upgrades Part 2
[ https://issues.apache.org/jira/browse/KAFKA-14175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-14175. - Resolution: Won't Fix > KRaft Upgrades Part 2 > - > > Key: KAFKA-14175 > URL: https://issues.apache.org/jira/browse/KAFKA-14175 > Project: Kafka > Issue Type: New Feature >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > > This is the parent issue for KIP-778 tasks which were not completed for the > 3.3 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14175) KRaft Upgrades Part 2
[ https://issues.apache.org/jira/browse/KAFKA-14175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-14175: Fix Version/s: (was: 3.7.0) > KRaft Upgrades Part 2 > - > > Key: KAFKA-14175 > URL: https://issues.apache.org/jira/browse/KAFKA-14175 > Project: Kafka > Issue Type: New Feature >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > > This is the parent issue for KIP-778 tasks which were not completed for the > 3.3 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14175) KRaft Upgrades Part 2
[ https://issues.apache.org/jira/browse/KAFKA-14175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775301#comment-17775301 ] Stanislav Kozlovski commented on KAFKA-14175: - [~mumrah] confirmed to me that this seems to be a stale issue and can be closed as won't fix - doing that now > KRaft Upgrades Part 2 > - > > Key: KAFKA-14175 > URL: https://issues.apache.org/jira/browse/KAFKA-14175 > Project: Kafka > Issue Type: New Feature >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.7.0 > > > This is the parent issue for KIP-778 tasks which were not completed for the > 3.3 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying
[ https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775300#comment-17775300 ] Stanislav Kozlovski commented on KAFKA-14945: - [~LSK] given the lack of activity from July 23, do we plan to target releasing this in 3.7, or is it unclear at this point? > Add Serializer#serializeToByteBuffer() to reduce memory copying > --- > > Key: KAFKA-14945 > URL: https://issues.apache.org/jira/browse/KAFKA-14945 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: LinShunkang >Assignee: LinShunkang >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > JIAR for KIP-872: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]
mjsax opened a new pull request, #14550: URL: https://github.com/apache/kafka/pull/14550 We embrace immutability and thus should return a new object instead of `this`, similar to other config classed we use in the DSL. Side JavaDocs cleanup for a bunch of classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15230) ApiVersions data between controllers is not reliable
[ https://issues.apache.org/jira/browse/KAFKA-15230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775299#comment-17775299 ] Stanislav Kozlovski commented on KAFKA-15230: - Did it release with 3.6.0? > ApiVersions data between controllers is not reliable > > > Key: KAFKA-15230 > URL: https://issues.apache.org/jira/browse/KAFKA-15230 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Assignee: Colin McCabe >Priority: Critical > Fix For: 3.7.0 > > > While testing ZK migrations, I noticed a case where the controller was not > starting the migration due to the missing ApiVersions data from other > controllers. This was unexpected because the quorum was running and the > followers were replicating the metadata log as expected. After examining a > heap dump of the leader, it was in fact the case that the ApiVersions map of > NodeApiVersions was empty. > > After further investigation and offline discussion with [~jsancio], we > realized that after the initial leader election, the connection from the Raft > leader to the followers will become idle and eventually timeout and close. > This causes NetworkClient to purge the NodeApiVersions data for the closed > connections. > > There are two main side effects of this behavior: > 1) If migrations are not started within the idle timeout period (10 minutes, > by default), then they will not be able to be started. After this timeout > period, I was unable to restart the controllers in such a way that the leader > had active connections with all followers. > 2) Dynamically updating features, such as "metadata.version", is not > guaranteed to be safe > > There is a partial workaround for the migration issue. If we set " > connections.max.idle.ms" to -1, the Raft leader will never disconnect from > the followers. However, if a follower restarts, the leader will not > re-establish a connection. > > The feature update issue has no safe workarounds. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-10892) Add Topology#connectSourceStoreAndTopic as a public method
[ https://issues.apache.org/jira/browse/KAFKA-10892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-10892: Fix Version/s: (was: 3.7.0) > Add Topology#connectSourceStoreAndTopic as a public method > -- > > Key: KAFKA-10892 > URL: https://issues.apache.org/jira/browse/KAFKA-10892 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Tomohiro Hashidate >Assignee: Daan Gerits >Priority: Major > Labels: kip > > I want Topology#connectSourceStoreAndTopic. > Because I want to use a topic as a source topic directly without a redundant > changelog topic for not only KeyValueStore but also WindowStore. > This issue is similar to [KAFKA-6840], but is a suggestion for a more general > approach > {code:java} > public synchronized Topology connectSourceStoreAndTopic(final String > sourceStoreName, > final String > topic) { > internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, > topic); > return this; > } > {code} > h3. Background > I want to use a topic as a source topic for WindowStore because using > WindowStore is suitable for the feature that I'm implementing. > The records stored in the topic are aggregated with a time window by another > application. The size of the topic is over 10TB. > I want to use the topic as a source topic for WindowStore directly. > But, I cannot do so on the current interface. > I need a redundant topic only for storing the records into WindowStore. > If this API is public, I can use topics incoming from other applications (not > only Kafka Streams applications) as source topics for any StateStore > implementation without redundant changelog topics. > Of course, I need to implement a processor for storing incoming records in > such a case. > But I think it's not difficult. > Please consider this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10892) Add Topology#connectSourceStoreAndTopic as a public method
[ https://issues.apache.org/jira/browse/KAFKA-10892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775298#comment-17775298 ] Stanislav Kozlovski commented on KAFKA-10892: - Given the lack of activity on [https://github.com/apache/kafka/pull/12742,] I won't include this in the initial AK 3.7 release plan and I'm removing the tag for now. Please let me know if you will target / believe we can get this done for 3.7 [~calmera] and we can re-add. Until then, let's keep it without a target release until we have high confidence we will hit it. Otherwise we end up bumping it on every release. > Add Topology#connectSourceStoreAndTopic as a public method > -- > > Key: KAFKA-10892 > URL: https://issues.apache.org/jira/browse/KAFKA-10892 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Tomohiro Hashidate >Assignee: Daan Gerits >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > I want Topology#connectSourceStoreAndTopic. > Because I want to use a topic as a source topic directly without a redundant > changelog topic for not only KeyValueStore but also WindowStore. > This issue is similar to [KAFKA-6840], but is a suggestion for a more general > approach > {code:java} > public synchronized Topology connectSourceStoreAndTopic(final String > sourceStoreName, > final String > topic) { > internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, > topic); > return this; > } > {code} > h3. Background > I want to use a topic as a source topic for WindowStore because using > WindowStore is suitable for the feature that I'm implementing. > The records stored in the topic are aggregated with a time window by another > application. The size of the topic is over 10TB. > I want to use the topic as a source topic for WindowStore directly. > But, I cannot do so on the current interface. > I need a redundant topic only for storing the records into WindowStore. > If this API is public, I can use topics incoming from other applications (not > only Kafka Streams applications) as source topics for any StateStore > implementation without redundant changelog topics. > Of course, I need to implement a processor for storing incoming records in > such a case. > But I think it's not difficult. > Please consider this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor
[ https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775296#comment-17775296 ] Stanislav Kozlovski commented on KAFKA-12473: - [~dajac] [~showuon] shall we update the KIP in that case? > Make the "cooperative-sticky, range" as the default assignor > > > Key: KAFKA-12473 > URL: https://issues.apache.org/jira/browse/KAFKA-12473 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Critical > Labels: kip > > Now that 3.0 is coming up, we can change the default > ConsumerPartitionAssignor to something better than the RangeAssignor. The > original plan was to switch over to the StickyAssignor, but now that we have > incremental cooperative rebalancing we should consider using the new > CooperativeStickyAssignor instead: this will enable the consumer group to > follow the COOPERATIVE protocol, improving the rebalancing experience OOTB. > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15147) Measure pending and outstanding Remote Segment operations
[ https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775295#comment-17775295 ] Stanislav Kozlovski commented on KAFKA-15147: - [~divijvaidya] given that there is no kip, is it realistic to still target AK 3.7? Perhaps we should remove the Fix Version until we have confidence in the release, otherwise it just ends up getting bumped each release (as we've seen in many other unrelated jiras) > Measure pending and outstanding Remote Segment operations > - > > Key: KAFKA-15147 > URL: https://issues.apache.org/jira/browse/KAFKA-15147 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Divij Vaidya >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > Remote Log Segment operations (copy/delete) are executed by the Remote > Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default > TopicBasedRLMM writes to the internal Kafka topic state changes on remote log > segments). > As executions run, fail, and retry; it will be important to know how many > operations are pending and outstanding over time to alert operators. > Pending operations are not enough to alert, as values can oscillate closer to > zero. An additional condition needs to apply (running time > threshold) to > consider an operation outstanding. > Proposal: > RemoteLogManager could be extended with 2 concurrent maps > (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure > segmentId time when operation started, and based on this expose 2 metrics per > operation: > * pendingSegmentCopies: gauge of pendingSegmentCopies map > * outstandingSegmentCopies: loop over pending ops, and if now - startedTime > > timeout, then outstanding++ (maybe on debug level?) > Is this a valuable metric to add to Tiered Storage? or better to solve on a > custom RLMM implementation? > Also, does it require a KIP? > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14175) KRaft Upgrades Part 2
[ https://issues.apache.org/jira/browse/KAFKA-14175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775294#comment-17775294 ] Stanislav Kozlovski commented on KAFKA-14175: - given the lack of activity in the last few releases, do we expect this to land in 3.7? It would be good to avoid having the Fix Version until we have confidence in it, as we just seem to bump it each release > KRaft Upgrades Part 2 > - > > Key: KAFKA-14175 > URL: https://issues.apache.org/jira/browse/KAFKA-14175 > Project: Kafka > Issue Type: New Feature >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.7.0 > > > This is the parent issue for KIP-778 tasks which were not completed for the > 3.3 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests
[ https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775293#comment-17775293 ] Stanislav Kozlovski commented on KAFKA-15594: - [~satish.duggana] is anyone going to be assigned to this? Did we release 3.6 without it? > Add 3.6.0 to streams upgrade/compatibility tests > > > Key: KAFKA-15594 > URL: https://issues.apache.org/jira/browse/KAFKA-15594 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Satish Duggana >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12399) Deprecate Log4J Appender KIP-719
[ https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-12399: Fix Version/s: (was: 3.7.0) > Deprecate Log4J Appender KIP-719 > > > Key: KAFKA-12399 > URL: https://issues.apache.org/jira/browse/KAFKA-12399 > Project: Kafka > Issue Type: Improvement > Components: logging >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > Labels: needs-kip > > As a following job of KAFKA-9366, we have to entirely remove the log4j 1.2.7 > dependency from the classpath by removing dependencies on log4j-appender. > KIP-719: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-719%3A+Deprecate+Log4J+Appender -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12399) Deprecate Log4J Appender KIP-719
[ https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775292#comment-17775292 ] Stanislav Kozlovski commented on KAFKA-12399: - This was last reviewed on March 7 and there has been no progress since, as far as I can tell. I will remove the target release version to avoid having to bump it each release (like we did in the last 3 releases). Please feel free to put it back if we are actively targeting any specific release > Deprecate Log4J Appender KIP-719 > > > Key: KAFKA-12399 > URL: https://issues.apache.org/jira/browse/KAFKA-12399 > Project: Kafka > Issue Type: Improvement > Components: logging >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > Labels: needs-kip > Fix For: 3.7.0 > > > As a following job of KAFKA-9366, we have to entirely remove the log4j 1.2.7 > dependency from the classpath by removing dependencies on log4j-appender. > KIP-719: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-719%3A+Deprecate+Log4J+Appender -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775291#comment-17775291 ] Stanislav Kozlovski commented on KAFKA-13152: - Given there hasn't been much movement in this PR since February 2023, are we targeting 3.7 as it's currently marked? [~sagarrao] as far as I understand, you're blocked on PR reviews - is that correct? > Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with > "{statestore.cache}/{input.buffer}.max.bytes" > - > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15593) Add 3.6.0 to broker/client upgrade/compatibility tests
[ https://issues.apache.org/jira/browse/KAFKA-15593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775286#comment-17775286 ] Stanislav Kozlovski commented on KAFKA-15593: - [~satish.duggana] is this good to close? seems weird to target 3.7 with 3.6 compatibility tests. AFAICT, the PRs are merged > Add 3.6.0 to broker/client upgrade/compatibility tests > -- > > Key: KAFKA-15593 > URL: https://issues.apache.org/jira/browse/KAFKA-15593 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14077) KRaft should support recovery from failed disk
[ https://issues.apache.org/jira/browse/KAFKA-14077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-14077: Fix Version/s: (was: 3.7.0) > KRaft should support recovery from failed disk > -- > > Key: KAFKA-14077 > URL: https://issues.apache.org/jira/browse/KAFKA-14077 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 2.8.0 >Reporter: Jason Gustafson >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > > If one of the nodes in the metadata quorum has a disk failure, there is no > way currently to safely bring the node back into the quorum. When we lose > disk state, we are at risk of losing committed data even if the failure only > affects a minority of the cluster. > Here is an example. Suppose that a metadata quorum has 3 members: v1, v2, and > v3. Initially, v1 is the leader and writes a record at offset 1. After v2 > acknowledges replication of the record, it becomes committed. Suppose that v1 > fails before v3 has a chance to replicate this record. As long as v1 remains > down, the raft protocol guarantees that only v2 can become leader, so the > record cannot be lost. The raft protocol expects that when v1 returns, it > will still have that record, but what if there is a disk failure, the state > cannot be recovered and v1 participates in leader election? Then we would > have committed data on a minority of the voters. The main problem here > concerns how we recover from this impaired state without risking the loss of > this data. > Consider a naive solution which brings v1 back with an empty disk. Since the > node has lost is prior knowledge of the state of the quorum, it will vote for > any candidate that comes along. If v3 becomes a candidate, then it will vote > for itself and it just needs the vote from v1 to become leader. If that > happens, then the committed data on v2 will become lost. > This is just one scenario. In general, the invariants that the raft protocol > is designed to preserve go out the window when disk state is lost. For > example, it is also possible to contrive a scenario where the loss of disk > state leads to multiple leaders. There is a good reason why raft requires > that any vote cast by a voter is written to disk since otherwise the voter > may vote for different candidates in the same epoch. > Many systems solve this problem with a unique identifier which is generated > automatically and stored on disk. This identifier is then committed to the > raft log. If a disk changes, we would see a new identifier and we can prevent > the node from breaking raft invariants. Then recovery from a failed disk > requires a quorum reconfiguration. We need something like this in KRaft to > make disk recovery possible. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14077) KRaft should support recovery from failed disk
[ https://issues.apache.org/jira/browse/KAFKA-14077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775285#comment-17775285 ] Stanislav Kozlovski commented on KAFKA-14077: - I asked [~jagsancio] and [~hachikuji] for what we should target. As there hasn't been much movement non this KIP from what I can tell in the discussion thread since 2022, I will remove the Fix Version to avoid having to bump it every release > KRaft should support recovery from failed disk > -- > > Key: KAFKA-14077 > URL: https://issues.apache.org/jira/browse/KAFKA-14077 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 2.8.0 >Reporter: Jason Gustafson >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Fix For: 3.7.0 > > > If one of the nodes in the metadata quorum has a disk failure, there is no > way currently to safely bring the node back into the quorum. When we lose > disk state, we are at risk of losing committed data even if the failure only > affects a minority of the cluster. > Here is an example. Suppose that a metadata quorum has 3 members: v1, v2, and > v3. Initially, v1 is the leader and writes a record at offset 1. After v2 > acknowledges replication of the record, it becomes committed. Suppose that v1 > fails before v3 has a chance to replicate this record. As long as v1 remains > down, the raft protocol guarantees that only v2 can become leader, so the > record cannot be lost. The raft protocol expects that when v1 returns, it > will still have that record, but what if there is a disk failure, the state > cannot be recovered and v1 participates in leader election? Then we would > have committed data on a minority of the voters. The main problem here > concerns how we recover from this impaired state without risking the loss of > this data. > Consider a naive solution which brings v1 back with an empty disk. Since the > node has lost is prior knowledge of the state of the quorum, it will vote for > any candidate that comes along. If v3 becomes a candidate, then it will vote > for itself and it just needs the vote from v1 to become leader. If that > happens, then the committed data on v2 will become lost. > This is just one scenario. In general, the invariants that the raft protocol > is designed to preserve go out the window when disk state is lost. For > example, it is also possible to contrive a scenario where the loss of disk > state leads to multiple leaders. There is a good reason why raft requires > that any vote cast by a voter is written to disk since otherwise the voter > may vote for different candidates in the same epoch. > Many systems solve this problem with a unique identifier which is generated > automatically and stored on disk. This identifier is then committed to the > raft log. If a disk changes, we would see a new identifier and we can prevent > the node from breaking raft invariants. Then recovery from a failed disk > requires a quorum reconfiguration. We need something like this in KRaft to > make disk recovery possible. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12886) Enable request forwarding by default
[ https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775284#comment-17775284 ] Stanislav Kozlovski commented on KAFKA-12886: - [~rdielhenn] is this targetting 3.7? I wonder if we should target a release at all since it seems like there hasn't been work on this PR for 2 years. I will remove the target Fix Version given that. Please let me know if I'm wrong > Enable request forwarding by default > > > Key: KAFKA-12886 > URL: https://issues.apache.org/jira/browse/KAFKA-12886 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 3.7.0 > > > KIP-590 documents that request forwarding will be enabled in 3.0 by default: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller. > This makes it a requirement for users with custom principal implementations > to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 > because we saw this as a compatibility break. > The KIP documents that use of forwarding will be controlled by the IBP. So > once the IBP has been configured to 3.0 or above, then the brokers will begin > forwarding. > (Note that forwarding has always been a requirement for kraft.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12886) Enable request forwarding by default
[ https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-12886: Fix Version/s: (was: 3.7.0) > Enable request forwarding by default > > > Key: KAFKA-12886 > URL: https://issues.apache.org/jira/browse/KAFKA-12886 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Blocker > > KIP-590 documents that request forwarding will be enabled in 3.0 by default: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller. > This makes it a requirement for users with custom principal implementations > to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 > because we saw this as a compatibility break. > The KIP documents that use of forwarding will be controlled by the IBP. So > once the IBP has been configured to 3.0 or above, then the brokers will begin > forwarding. > (Note that forwarding has always been a requirement for kraft.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15406) Add the ForwardingManager metrics from KIP-938
[ https://issues.apache.org/jira/browse/KAFKA-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775275#comment-17775275 ] Stanislav Kozlovski commented on KAFKA-15406: - [~cmccabe] given that we are targetting this for 3.7 as per your comment in KAFKA-15183, should we set the Fix Version to 3.7? I ack we usually seem to set this when we merge, but the release plan doc has a filter that only looks at Fix Version - so it's a useful way to know what'll be part of the release > Add the ForwardingManager metrics from KIP-938 > -- > > Key: KAFKA-15406 > URL: https://issues.apache.org/jira/browse/KAFKA-15406 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.7.0 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: newbie, newbie++ > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: cleanup warnings in Kafka Streams code base [kafka]
mjsax opened a new pull request, #14549: URL: https://github.com/apache/kafka/pull/14549 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] POC - DO NO MERGE: refactor StreamsConfig [kafka]
mjsax commented on code in PR #14548: URL: https://github.com/apache/kafka/pull/14548#discussion_r1359477497 ## build.gradle: ## @@ -2151,7 +2151,7 @@ project(':streams') { task genStreamsConfigDocs(type: JavaExec) { classpath = sourceSets.main.runtimeClasspath -mainClass = 'org.apache.kafka.streams.StreamsConfig' +mainClass = 'org.apache.kafka.streams.internals.InternalStreamsConfig' Review Comment: We use `main` to generate the config HTML for the webpage, so update to use internal one going forward ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -843,7 +843,7 @@ public KafkaStreams(final Topology topology, public KafkaStreams(final Topology topology, final StreamsConfig applicationConfigs, final KafkaClientSupplier clientSupplier) { -this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier); +this(topology, new InternalStreamsConfig(applicationConfigs), clientSupplier, Time.SYSTEM); Review Comment: Some side cleanup on constructor methods -- no public change -- just to make the flow simpler ## streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java: ## @@ -1089,34 +1091,30 @@ public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() { @Test public void shouldGetClientSupplierFromConfigForConstructor() { Review Comment: This and the next test needed an update, because the refactoring break the mocking -- Cf the change in `MockClientSupplier`, too. ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1171,72 +1170,6 @@ public class StreamsConfig extends AbstractConfig { CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } -public static class InternalConfig { Review Comment: Even is this class it's public, it's called `InternalConfig` and thus clearly internal -- I believe it's ok to move to new sub-class without deprecation (as a matter of fact, we won't need this helper class any longer, and I only moved the members of this class to `InternalStreamsConfig`. ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -790,7 +790,7 @@ public KafkaStreams(final Topology topology, public KafkaStreams(final Topology topology, final Properties props, final Time time) { -this(topology, new StreamsConfig(props), time); +this(topology, new InternalStreamsConfig(props), null, time); Review Comment: Some side cleanup on constructor methods -- no public change -- just to make the flow simpler ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -860,32 +860,37 @@ public KafkaStreams(final Topology topology, public KafkaStreams(final Topology topology, final StreamsConfig applicationConfigs, final Time time) { -this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, applicationConfigs.getKafkaClientSupplier(), time); +this(topology, new InternalStreamsConfig(applicationConfigs), null, time); Review Comment: Some side cleanup on constructor methods -- no public change -- just to make the flow simpler ## streams/src/main/java/org/apache/kafka/streams/internals/InternalStreamsConfig.java: ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; +import org.apache.kafka.streams.proces
[PR] POC - DO NO MERGE: refactor StreamsConfig [kafka]
mjsax opened a new pull request, #14548: URL: https://github.com/apache/kafka/pull/14548 This is a POC to refactor `StreamsConfig` to move "leaking" internal methods into a newly added subclass `InternalStreamsConfig`, following an already established pattern using for DSL config objects (like `Consumed`, `Produced`, `Materialized` etc). This change requires a KIP. We would deprecate the following methods on `StreamsConfig` to be able to remove them w/o replacement, as we should consider them internal: - `confgDef` - `getMainConsumerConfig` - `getRestoreConsumeConfig` - `getGlobalConsumerConfig` - `getProducerConfig` - `getAdminConfig` - `getClientTag` - `getKafkaClientSupplier` - `defaultKeySerde` - `defaultValueSerde` - `defaultTimestampExtractor` - `defaultDeserializationHandler` - `defaultProductionExceptionHandler` - `main` In addition, this PR changes all internal usage from `StreamsConfig` to `InternalStreamsConfig`. If we believe this is a reasonable change, I can prepare a KIP for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15607) Possible NPE is thrown in MirrorCheckpointTask
hudeqi created KAFKA-15607: -- Summary: Possible NPE is thrown in MirrorCheckpointTask Key: KAFKA-15607 URL: https://issues.apache.org/jira/browse/KAFKA-15607 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.8.1 Reporter: hudeqi Assignee: hudeqi In the `syncGroupOffset` method, if `targetConsumerOffset.get(topicPartition)` gets null, then the calculation of `latestDownstreamOffset` will throw NPE. This usually occurs in this situation: a group consumed a topic in the target cluster previously. Later, the group offset of some partitions was reset to -1, the `OffsetAndMetadata` of these partitions was null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460675 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) + +// Increase cache capacity to only store 2 entries +cache.resizeCacheSize(2 * estimateEntryBytesSize) +assertCacheSize(0) + +val entry0 = cache.getIndexEntry(metadataList(0)) +val entry1 = cache.getIndexEntry(metadataList(1)) +cache.getIndexEntry(metadataList(2)) +assertCacheSize(2) +val missingMetadata = metadataList(0) +val missingEntry = entry0 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup, + "Failed to mark evicted cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted, + "Failed to cleanup evicted cache entry after resizing cache.") +// verify no index files for `missingEntry` on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent, + s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent, + s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent, + s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent, + s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + +// Reduce cache capacity to only store 1 entries +cache.resizeCacheSize(1 * estimateEntryBytesSize) +assertCacheSize(1) + +val nextMissingMetadata = metadataList(1) +val nextMissingEntry = entry1 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup, + "Failed to mark evict
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460627 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) + +// Increase cache capacity to only store 2 entries +cache.resizeCacheSize(2 * estimateEntryBytesSize) +assertCacheSize(0) + +val entry0 = cache.getIndexEntry(metadataList(0)) +val entry1 = cache.getIndexEntry(metadataList(1)) +cache.getIndexEntry(metadataList(2)) +assertCacheSize(2) +val missingMetadata = metadataList(0) +val missingEntry = entry0 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup, + "Failed to mark evicted cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted, + "Failed to cleanup evicted cache entry after resizing cache.") +// verify no index files for `missingEntry` on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent, + s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent, + s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent, + s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent, + s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + +// Reduce cache capacity to only store 1 entries +cache.resizeCacheSize(1 * estimateEntryBytesSize) +assertCacheSize(1) + +val nextMissingMetadata = metadataList(1) +val nextMissingEntry = entry1 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup, + "Failed to mark evict
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460606 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460521 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1359435180 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -525,7 +525,7 @@ class RemoteIndexCacheTest { .filter(path => path.getFileName.toString.endsWith(suffix)) .findAny() } catch { -case _: FileNotFoundException => Optional.empty() +case _: NoSuchFileException => Optional.empty() Review Comment: @showuon Ah, I think the `FileNotFoundException` added here may not be the correct exception thrown. This is what I discovered when merging the latest trunk: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14511/4/tests 错误 java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted 栈跟踪 java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted at java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88) at java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104) at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499) ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -525,7 +525,7 @@ class RemoteIndexCacheTest { .filter(path => path.getFileName.toString.endsWith(suffix)) .findAny() } catch { -case _: FileNotFoundException => Optional.empty() +case _: NoSuchFileException => Optional.empty() Review Comment: @showuon Ah, I think the `FileNotFoundException` added here may not be the correct exception thrown. This is what I discovered when merging the latest trunk: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14511/4/tests 错误 java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted 栈跟踪 java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted at java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88) at java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104) at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]
iit2009060 commented on code in PR #14483: URL: https://github.com/apache/kafka/pull/14483#discussion_r1359185319 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -592,16 +593,75 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testConcurrentRemoveReadForCache(): Unit = { +// Create a spy Cache Entry +val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, + time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + +val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) +val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) +val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + +val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) +cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry) + +assertCacheSize(1) + +var entry: RemoteIndexCache.Entry = null + +val latchForCacheRead = new CountDownLatch(1) +val latchForCacheRemove = new CountDownLatch(1) +val latchForTestWait = new CountDownLatch(1) + +doAnswer((invocation: InvocationOnMock) => { + // Signal the CacheRead to unblock itself + latchForCacheRead.countDown() + // Wait for signal to start renaming the files + latchForCacheRemove.await() + // Calling the markForCleanup() actual method to start renaming the files + invocation.callRealMethod() Review Comment: Why we are calling actual rename of the file in callRealMethod ? Correct me if my understanding is wrong here ? Two threads are defined 1. removalCache - executing remove function of the cache. 2. ReadCache - for reading data from the cache when spy.markforEntryCleanUp is executed. = Operations 1. removeCache triggered 2. spyEntry.markForCleanup when executed ( the files are already renamed to .delete) 3. readCache executed and finished because no lock is pending on the remove operation 4. It creates new file in the entry again(fetched from remote storage, rather than existed in the cache)(We should validate the number of calls to rsm here ) 5.After latchfoCacheRemove.await() ,why we are explicitlly calling markCleanup again ? ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -193,7 +192,16 @@ public File cacheDir() { public void remove(Uuid key) { lock.readLock().lock(); try { -internalCache.invalidate(key); +internalCache.asMap().computeIfPresent(key, (k, v) -> { +try { +v.markForCleanup(); +expiredIndexes.put(v); Review Comment: @showuon I was just thinking around it , if this fails , it will create a dump of files with delete suffix entry which never gets deleted from the disk ? Is the behaviour ok ? ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -193,7 +192,16 @@ public File cacheDir() { public void remove(Uuid key) { lock.readLock().lock(); try { -internalCache.invalidate(key); +internalCache.asMap().computeIfPresent(key, (k, v) -> { +try { +v.markForCleanup(); +expiredIndexes.put(v); Review Comment: @showuon @kamalcph @jeel2420 I think we are entering into a deadlock state here During remove we try to take a readLock but in markForCleanUp we try to take a write lock , Will it not result a deadlock state ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15387: Deprecate Connect's redundant task configurations endpoint [kafka]
yashmayya commented on PR #14361: URL: https://github.com/apache/kafka/pull/14361#issuecomment-1762750265 Test failures are unrelated, merging to `trunk` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15387: Deprecate Connect's redundant task configurations endpoint [kafka]
yashmayya merged PR #14361: URL: https://github.com/apache/kafka/pull/14361 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org