Re: [PR] MINOR: cleanup warnings in Kafka Streams code base [kafka]

2023-10-14 Thread via GitHub


ableegoldman commented on code in PR #14549:
URL: https://github.com/apache/kafka/pull/14549#discussion_r1359737490


##
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##
@@ -87,12 +87,12 @@ public StreamsBuilder() {
  */
 @SuppressWarnings("this-escape")
 public StreamsBuilder(final TopologyConfig topologyConfigs) {
-topology = getNewTopology(topologyConfigs);
+topology = newTopology(topologyConfigs);
 internalTopologyBuilder = topology.internalTopologyBuilder;
 internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
 }
 
-protected Topology getNewTopology(final TopologyConfig topologyConfigs) {

Review Comment:
   I mean...there are definitely some people using named topologies, though 
I've made sure they are aware it is not considered a public API and not 
intended for public use without a KIP
   
   Anyways given that this specific API is not even a public method I think 
it's fair game to remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14684 Replace EasyMock and PowerMock with Mockito in WorkerSinkTaskThreadedTest [kafka]

2023-10-14 Thread via GitHub


hgeraldino commented on code in PR #14505:
URL: https://github.com/apache/kafka/pull/14505#discussion_r1359700014


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java:
##
@@ -409,309 +430,266 @@ public void testAssignmentPauseResume() throws 
Exception {
 }
 sinkTaskContext.getValue().pause(TOPIC_PARTITION, 
TOPIC_PARTITION2);
 return null;
-});
-consumer.pause(Arrays.asList(UNASSIGNED_TOPIC_PARTITION));
-PowerMock.expectLastCall().andThrow(new 
IllegalStateException("unassigned topic partition"));
-consumer.pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
-PowerMock.expectLastCall();
-
-expectOnePoll().andAnswer(() -> {
+}).doAnswer(invocation -> {
 try {
 sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
 fail("Trying to resume unassigned partition should have thrown 
an Connect exception");
 } catch (ConnectException e) {
 // expected
 }
-
 sinkTaskContext.getValue().resume(TOPIC_PARTITION, 
TOPIC_PARTITION2);
 return null;
-});
-consumer.resume(Arrays.asList(UNASSIGNED_TOPIC_PARTITION));
-PowerMock.expectLastCall().andThrow(new 
IllegalStateException("unassigned topic partition"));
-consumer.resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
-PowerMock.expectLastCall();
+}).when(sinkTask).put(any(Collection.class));
 
-expectStopTask();
+doThrow(new IllegalStateException("unassigned topic 
partition")).when(consumer).pause(singletonList(UNASSIGNED_TOPIC_PARTITION));
+doAnswer(invocation -> 
null).when(consumer).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
 
-PowerMock.replayAll();
+doThrow(new IllegalStateException("unassigned topic 
partition")).when(consumer).resume(singletonList(UNASSIGNED_TOPIC_PARTITION));
+doAnswer(invocation -> 
null).when(consumer).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
 
 workerTask.initialize(TASK_CONFIG);
 workerTask.initializeAndStart();
+verifyInitializeTask();
+
 workerTask.iteration();
+verifyInitialAssignment();
+
 workerTask.iteration();
 workerTask.iteration();
 workerTask.iteration();
 workerTask.stop();
 workerTask.close();
+verifyStopTask();
+verifyTaskGetTopic(3);
 
-PowerMock.verifyAll();
+verify(consumer, atLeastOnce()).pause(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2));
+verify(consumer, atLeastOnce()).resume(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2));
 }
 
 @Test
-public void testRewind() throws Exception {
-expectInitializeTask();
-expectTaskGetTopic(true);
-expectPollInitialAssignment();
+public void testRewind() {
+expectTaskGetTopic();
+expectInitialAssignment();
+expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
 
 final long startOffset = 40L;
 final Map offsets = new HashMap<>();
 
-expectOnePoll().andAnswer(() -> {
+doAnswer(invocation -> {
+return null; // initial assignment
+}).doAnswer(invocation -> {
 offsets.put(TOPIC_PARTITION, startOffset);
 sinkTaskContext.getValue().offset(offsets);
 return null;
-});
-
-consumer.seek(TOPIC_PARTITION, startOffset);
-EasyMock.expectLastCall();
-
-expectOnePoll().andAnswer(() -> {
+}).doAnswer(invocation -> {
 Map offsets1 = 
sinkTaskContext.getValue().offsets();
 assertEquals(0, offsets1.size());
 return null;
-});
-
-expectStopTask();
-PowerMock.replayAll();
+}).when(sinkTask).put(any(Collection.class));
 
 workerTask.initialize(TASK_CONFIG);
 workerTask.initializeAndStart();
+verifyInitializeTask();
+
 workerTask.iteration();
+verifyInitialAssignment();
+
 workerTask.iteration();
 workerTask.iteration();
+verify(consumer).seek(TOPIC_PARTITION, startOffset);
+
 workerTask.stop();
 workerTask.close();
-
-PowerMock.verifyAll();
+verifyStopTask();
+verifyTaskGetTopic(2);
 }
 
 @Test
-public void testRewindOnRebalanceDuringPoll() throws Exception {
-expectInitializeTask();
-expectTaskGetTopic(true);
-expectPollInitialAssignment();
-expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2);
+public void testRewindOnRebalanceDuringPoll() {
+final long startOffset = 40L;
+
+expectTaskGetTopic();
+expectInitialAssignment();
+expectRebalanceDuringPoll(startOffset);
 
-expectRebalanceDuringPoll().andAnswer(() -> {
+doA

Re: [PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]

2023-10-14 Thread via GitHub


guozhangwang commented on code in PR #14550:
URL: https://github.com/apache/kafka/pull/14550#discussion_r1359692144


##
streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java:
##
@@ -90,33 +104,43 @@ public static  Branched withConsumer(final 
Consumer> c
 /**
  * Create an instance of {@code Branched} with provided chain function and 
branch name suffix.
  *
- * @param chain A function that will be applied to the branch. If the 
provided function returns
- *  {@code null}, its result is ignored, otherwise it is added 
to the {@code Map} returned
- *  by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
- *  {@link BranchedKStream} description for details).
- * @param name  the branch name suffix to be used. If {@code null}, a 
default branch name suffix will be generated
- *  (see {@link BranchedKStream} description for details)
- * @paramkey type
- * @paramvalue type
+ * @param chain
+ *A function that will be applied to the branch. If the provided 
function returns
+ *{@code null}, its result is ignored, otherwise it is added to 
the {@code Map} returned
+ *by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+ *{@link BranchedKStream} description for details).
+ * @param name
+ *the branch name suffix to be used. If {@code null}, a default 
branch name suffix will be generated
+ *(see {@link BranchedKStream} description for details)
+ *
+ * @param  key type
+ * @param  value type
+ *
  * @return a new instance of {@code Branched}
  */
 public static  Branched withFunction(
-final Function, ? extends KStream> 
chain, final String name) {
+final Function, ? extends KStream> chain,

Review Comment:
   I just personally feel a newline with just an extra param feels not very 
elegant. Again just my own thoughts, feel free to ignore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: cleanup warnings in Kafka Streams code base [kafka]

2023-10-14 Thread via GitHub


mjsax commented on code in PR #14549:
URL: https://github.com/apache/kafka/pull/14549#discussion_r1359669572


##
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##
@@ -87,12 +87,12 @@ public StreamsBuilder() {
  */
 @SuppressWarnings("this-escape")
 public StreamsBuilder(final TopologyConfig topologyConfigs) {
-topology = getNewTopology(topologyConfigs);
+topology = newTopology(topologyConfigs);
 internalTopologyBuilder = topology.internalTopologyBuilder;
 internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
 }
 
-protected Topology getNewTopology(final TopologyConfig topologyConfigs) {

Review Comment:
   I could not find anything. Maybe @ableegoldman can comment? -- Also no big 
deal to leave this one. In the end, we plan to remove this experimental feature 
in the future anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]

2023-10-14 Thread via GitHub


mjsax commented on code in PR #14550:
URL: https://github.com/apache/kafka/pull/14550#discussion_r1359669078


##
streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java:
##
@@ -90,33 +104,43 @@ public static  Branched withConsumer(final 
Consumer> c
 /**
  * Create an instance of {@code Branched} with provided chain function and 
branch name suffix.
  *
- * @param chain A function that will be applied to the branch. If the 
provided function returns
- *  {@code null}, its result is ignored, otherwise it is added 
to the {@code Map} returned
- *  by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
- *  {@link BranchedKStream} description for details).
- * @param name  the branch name suffix to be used. If {@code null}, a 
default branch name suffix will be generated
- *  (see {@link BranchedKStream} description for details)
- * @paramkey type
- * @paramvalue type
+ * @param chain
+ *A function that will be applied to the branch. If the provided 
function returns
+ *{@code null}, its result is ignored, otherwise it is added to 
the {@code Map} returned
+ *by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+ *{@link BranchedKStream} description for details).
+ * @param name
+ *the branch name suffix to be used. If {@code null}, a default 
branch name suffix will be generated
+ *(see {@link BranchedKStream} description for details)
+ *
+ * @param  key type
+ * @param  value type
+ *
  * @return a new instance of {@code Branched}
  */
 public static  Branched withFunction(
-final Function, ? extends KStream> 
chain, final String name) {
+final Function, ? extends KStream> chain,

Review Comment:
   Not necessary. It's my personal preferred style. Happy to change it back. 
Please let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]

2023-10-14 Thread via GitHub


mjsax commented on code in PR #14550:
URL: https://github.com/apache/kafka/pull/14550#discussion_r1359668981


##
streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java:
##
@@ -27,15 +27,18 @@
  *
  * @param  key type
  * @param  value type
+ *
  * @see KStream#foreach(ForeachAction)
  */
 public interface ForeachAction {
 
 /**
  * Perform an action for each record of a stream.
  *
- * @param key   the key of the record
- * @param value the value of the record
+ * @param key

Review Comment:
   It's not necessary but make JavaDocs easier to read in general -- and I like 
to have consistent formatting across the board -- would like to establish this 
new formatting if nobody objects.
   
   Especially when there is many parameters with name of different length, it 
cumbersome to get nice formatting -- adding the description in the next line 
avoid all those whitespaces (and if we rename a parameter it becomes a single 
line change and no reformatting is necessary)



##
streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java:
##
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.streams.kstream;
 
-
 /**
  * The {@code Initializer} interface for creating an initial value in 
aggregations.
  * {@code Initializer} is used in combination with {@link Aggregator}.
  *
- * @param  aggregate value type
+ * @param  aggregate value type

Review Comment:
   Ups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]

2023-10-14 Thread via GitHub


mjsax commented on code in PR #14550:
URL: https://github.com/apache/kafka/pull/14550#discussion_r1359668737


##
streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java:
##
@@ -125,21 +149,14 @@ public static  Branched withConsumer(final 
Consumer(name, null, chain);
 }
 
-/**
- * Create an instance of {@code Branched} from an existing instance.
- *
- * @param branched the instance of {@code Branched} to copy
- */
-protected Branched(final Branched branched) {

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775325#comment-17775325
 ] 

Matthias J. Sax commented on KAFKA-13152:
-

[~guozhang] would you have interest to help getting this into 3.7? It keeps 
slipping... :(

> Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with 
> "{statestore.cache}/{input.buffer}.max.bytes"
> -
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775323#comment-17775323
 ] 

Matthias J. Sax commented on KAFKA-15594:
-

This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done 
after 3.6 is released – it's WIP.

> Add 3.6.0 to streams upgrade/compatibility tests
> 
>
> Key: KAFKA-15594
> URL: https://issues.apache.org/jira/browse/KAFKA-15594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775324#comment-17775324
 ] 

Matthias J. Sax commented on KAFKA-15594:
-

This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done 
after 3.6 is released – it's WIP.

> Add 3.6.0 to streams upgrade/compatibility tests
> 
>
> Key: KAFKA-15594
> URL: https://issues.apache.org/jira/browse/KAFKA-15594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15593) Add 3.6.0 to broker/client upgrade/compatibility tests

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775322#comment-17775322
 ] 

Matthias J. Sax commented on KAFKA-15593:
-

This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done 
after 3.6 is released – it's WIP.

> Add 3.6.0 to broker/client upgrade/compatibility tests
> --
>
> Key: KAFKA-15593
> URL: https://issues.apache.org/jira/browse/KAFKA-15593
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15406) Add the ForwardingManager metrics from KIP-938

2023-10-14 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775316#comment-17775316
 ] 

Colin McCabe commented on KAFKA-15406:
--

This feature is planned for 3.7, but not with high confidence right now. It 
might slip. We have to get JBOD in.

> Add the ForwardingManager metrics from KIP-938
> --
>
> Key: KAFKA-15406
> URL: https://issues.apache.org/jira/browse/KAFKA-15406
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: newbie, newbie++
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: cleanup warnings in Kafka Streams code base [kafka]

2023-10-14 Thread via GitHub


guozhangwang commented on code in PR #14549:
URL: https://github.com/apache/kafka/pull/14549#discussion_r1359654735


##
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##
@@ -87,12 +87,12 @@ public StreamsBuilder() {
  */
 @SuppressWarnings("this-escape")
 public StreamsBuilder(final TopologyConfig topologyConfigs) {
-topology = getNewTopology(topologyConfigs);
+topology = newTopology(topologyConfigs);
 internalTopologyBuilder = topology.internalTopologyBuilder;
 internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
 }
 
-protected Topology getNewTopology(final TopologyConfig topologyConfigs) {

Review Comment:
   Have we double check that there's no callers outside KS code (like in other 
repos) of this function? Just double checking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]

2023-10-14 Thread via GitHub


guozhangwang commented on code in PR #14550:
URL: https://github.com/apache/kafka/pull/14550#discussion_r1359652648


##
streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java:
##
@@ -27,15 +27,18 @@
  *
  * @param  key type
  * @param  value type
+ *
  * @see KStream#foreach(ForeachAction)
  */
 public interface ForeachAction {
 
 /**
  * Perform an action for each record of a stream.
  *
- * @param key   the key of the record
- * @param value the value of the record
+ * @param key

Review Comment:
   Is this newline necessary?



##
streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java:
##
@@ -125,21 +149,14 @@ public static  Branched withConsumer(final 
Consumer(name, null, chain);
 }
 
-/**
- * Create an instance of {@code Branched} from an existing instance.
- *
- * @param branched the instance of {@code Branched} to copy
- */
-protected Branched(final Branched branched) {

Review Comment:
   I'm assuming this is just for moving the func around, and removing the 
javadoc for those protected functions?



##
streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java:
##
@@ -57,29 +64,36 @@ public static  Branched as(final String name) {
 /**
  * Create an instance of {@code Branched} with provided chain function.
  *
- * @param chain A function that will be applied to the branch. If the 
provided function returns
- *  {@code null}, its result is ignored, otherwise it is added 
to the {@code Map} returned
- *  by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
- *  {@link BranchedKStream} description for details).
- * @paramkey type
- * @paramvalue type
+ * @param chain
+ *A function that will be applied to the branch. If the provided 
function returns
+ *{@code null}, its result is ignored, otherwise it is added to 
the {@code Map} returned
+ *by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+ *{@link BranchedKStream} description for details).
+ *
+ * @param  key type
+ * @param  value type
+ *
  * @return a new instance of {@code Branched}
  */
 public static  Branched withFunction(
-final Function, ? extends KStream> 
chain) {
+final Function, ? extends KStream> 
chain

Review Comment:
   Ditto here?



##
streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java:
##
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.streams.kstream;
 
-
 /**
  * The {@code Initializer} interface for creating an initial value in 
aggregations.
  * {@code Initializer} is used in combination with {@link Aggregator}.
  *
- * @param  aggregate value type
+ * @param  aggregate value type

Review Comment:
   Agg? :)



##
streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java:
##
@@ -90,33 +104,43 @@ public static  Branched withConsumer(final 
Consumer> c
 /**
  * Create an instance of {@code Branched} with provided chain function and 
branch name suffix.
  *
- * @param chain A function that will be applied to the branch. If the 
provided function returns
- *  {@code null}, its result is ignored, otherwise it is added 
to the {@code Map} returned
- *  by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
- *  {@link BranchedKStream} description for details).
- * @param name  the branch name suffix to be used. If {@code null}, a 
default branch name suffix will be generated
- *  (see {@link BranchedKStream} description for details)
- * @paramkey type
- * @paramvalue type
+ * @param chain
+ *A function that will be applied to the branch. If the provided 
function returns
+ *{@code null}, its result is ignored, otherwise it is added to 
the {@code Map} returned
+ *by {@link BranchedKStream#defaultBranch()} or {@link 
BranchedKStream#noDefaultBranch()} (see
+ *{@link BranchedKStream} description for details).
+ * @param name
+ *the branch name suffix to be used. If {@code null}, a default 
branch name suffix will be generated
+ *(see {@link BranchedKStream} description for details)
+ *
+ * @param  key type
+ * @param  value type
+ *
  * @return a new instance of {@code Branched}
  */
 public static  Branched withFunction(
-final Function, ? extends KStream> 
chain, final String name) {
+final Function, ? extends KStream> chain,

Review Comment:
   Ditto here, I feel these newlines are not very necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, 

[jira] [Assigned] (KAFKA-15147) Measure pending and outstanding Remote Segment operations

2023-10-14 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya reassigned KAFKA-15147:


Assignee: Christo Lolov  (was: Divij Vaidya)

> Measure pending and outstanding Remote Segment operations
> -
>
> Key: KAFKA-15147
> URL: https://issues.apache.org/jira/browse/KAFKA-15147
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
> Remote Log Segment operations (copy/delete) are executed by the Remote 
> Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default 
> TopicBasedRLMM writes to the internal Kafka topic state changes on remote log 
> segments).
> As executions run, fail, and retry; it will be important to know how many 
> operations are pending and outstanding over time to alert operators.
> Pending operations are not enough to alert, as values can oscillate closer to 
> zero. An additional condition needs to apply (running time > threshold) to 
> consider an operation outstanding.
> Proposal:
> RemoteLogManager could be extended with 2 concurrent maps 
> (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure 
> segmentId time when operation started, and based on this expose 2 metrics per 
> operation:
>  * pendingSegmentCopies: gauge of pendingSegmentCopies map
>  * outstandingSegmentCopies: loop over pending ops, and if now - startedTime 
> > timeout, then outstanding++ (maybe on debug level?)
> Is this a valuable metric to add to Tiered Storage? or better to solve on a 
> custom RLMM implementation?
> Also, does it require a KIP?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15249) Verify Connect test-plugins artifact is published to Maven Central

2023-10-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15249.
---
Resolution: Done

> Verify Connect test-plugins artifact is published to Maven Central
> --
>
> Key: KAFKA-15249
> URL: https://issues.apache.org/jira/browse/KAFKA-15249
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
>
> In KAFKA-14759 we created a separate {{connect/test-plugins}} module to store 
> all testing-only Connect plugins and removed those plugins from existing 
> Connect modules.
> These testing-only plugins are intentionally excluded from the project's 
> release file (which can be generated with {{{}./gradlew releaseTarGz{}}}) 
> however, some users may still be relying on them for testing environments.
> Although we should refrain from distributing these testing-only plugins with 
> our out-of-the-box distribution of Connect, we should still ensure that 
> they're available on an opt-in basis to users who would like to continue 
> using them. This can be accomplished by publishing them to [Maven 
> Central|https://search.maven.org/], like we do with our other modules.
> This will probably happen automatically during the next release (3.6.0) with 
> no further action required. This ticket is just here as a reminder to verify 
> that the artifacts are present in the staging Maven repo when release 
> candidates are published for voting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15147) Measure pending and outstanding Remote Segment operations

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775304#comment-17775304
 ] 

Stanislav Kozlovski commented on KAFKA-15147:
-

thanks for the quick update [~divijvaidya] - adding it to the freshly-released 
release plan

 

> Measure pending and outstanding Remote Segment operations
> -
>
> Key: KAFKA-15147
> URL: https://issues.apache.org/jira/browse/KAFKA-15147
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
> Remote Log Segment operations (copy/delete) are executed by the Remote 
> Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default 
> TopicBasedRLMM writes to the internal Kafka topic state changes on remote log 
> segments).
> As executions run, fail, and retry; it will be important to know how many 
> operations are pending and outstanding over time to alert operators.
> Pending operations are not enough to alert, as values can oscillate closer to 
> zero. An additional condition needs to apply (running time > threshold) to 
> consider an operation outstanding.
> Proposal:
> RemoteLogManager could be extended with 2 concurrent maps 
> (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure 
> segmentId time when operation started, and based on this expose 2 metrics per 
> operation:
>  * pendingSegmentCopies: gauge of pendingSegmentCopies map
>  * outstandingSegmentCopies: loop over pending ops, and if now - startedTime 
> > timeout, then outstanding++ (maybe on debug level?)
> Is this a valuable metric to add to Tiered Storage? or better to solve on a 
> custom RLMM implementation?
> Also, does it require a KIP?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15147) Measure pending and outstanding Remote Segment operations

2023-10-14 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775303#comment-17775303
 ] 

Divij Vaidya commented on KAFKA-15147:
--

Hi [~enether] 
We have a KIP for this Jira which is under discussion. See: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Upload+and+delete+lag+metrics+in+Tiered+Storage]
 
The KIP is targeted to land in 3.7 as part of TS productional readiness.

> Measure pending and outstanding Remote Segment operations
> -
>
> Key: KAFKA-15147
> URL: https://issues.apache.org/jira/browse/KAFKA-15147
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
> Remote Log Segment operations (copy/delete) are executed by the Remote 
> Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default 
> TopicBasedRLMM writes to the internal Kafka topic state changes on remote log 
> segments).
> As executions run, fail, and retry; it will be important to know how many 
> operations are pending and outstanding over time to alert operators.
> Pending operations are not enough to alert, as values can oscillate closer to 
> zero. An additional condition needs to apply (running time > threshold) to 
> consider an operation outstanding.
> Proposal:
> RemoteLogManager could be extended with 2 concurrent maps 
> (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure 
> segmentId time when operation started, and based on this expose 2 metrics per 
> operation:
>  * pendingSegmentCopies: gauge of pendingSegmentCopies map
>  * outstandingSegmentCopies: loop over pending ops, and if now - startedTime 
> > timeout, then outstanding++ (maybe on debug level?)
> Is this a valuable metric to add to Tiered Storage? or better to solve on a 
> custom RLMM implementation?
> Also, does it require a KIP?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14175) KRaft Upgrades Part 2

2023-10-14 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski resolved KAFKA-14175.
-
Resolution: Won't Fix

> KRaft Upgrades Part 2
> -
>
> Key: KAFKA-14175
> URL: https://issues.apache.org/jira/browse/KAFKA-14175
> Project: Kafka
>  Issue Type: New Feature
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
>
> This is the parent issue for KIP-778 tasks which were not completed for the 
> 3.3 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14175) KRaft Upgrades Part 2

2023-10-14 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-14175:

Fix Version/s: (was: 3.7.0)

> KRaft Upgrades Part 2
> -
>
> Key: KAFKA-14175
> URL: https://issues.apache.org/jira/browse/KAFKA-14175
> Project: Kafka
>  Issue Type: New Feature
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
>
> This is the parent issue for KIP-778 tasks which were not completed for the 
> 3.3 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14175) KRaft Upgrades Part 2

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775301#comment-17775301
 ] 

Stanislav Kozlovski commented on KAFKA-14175:
-

[~mumrah] confirmed to me that this seems to be a stale issue and can be closed 
as won't fix - doing that now

 

> KRaft Upgrades Part 2
> -
>
> Key: KAFKA-14175
> URL: https://issues.apache.org/jira/browse/KAFKA-14175
> Project: Kafka
>  Issue Type: New Feature
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.7.0
>
>
> This is the parent issue for KIP-778 tasks which were not completed for the 
> 3.3 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14945) Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775300#comment-17775300
 ] 

Stanislav Kozlovski commented on KAFKA-14945:
-

[~LSK] given the lack of activity from July 23, do we plan to target releasing 
this in 3.7, or is it unclear at this point?

 

> Add Serializer#serializeToByteBuffer() to reduce memory copying
> ---
>
> Key: KAFKA-14945
> URL: https://issues.apache.org/jira/browse/KAFKA-14945
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: LinShunkang
>Assignee: LinShunkang
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> JIAR for KIP-872: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Fix `Consumed` to return new object instead of `this` [kafka]

2023-10-14 Thread via GitHub


mjsax opened a new pull request, #14550:
URL: https://github.com/apache/kafka/pull/14550

   We embrace immutability and thus should return a new object instead of 
`this`, similar to other config classed we use in the DSL.
   
   Side JavaDocs cleanup for a bunch of classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15230) ApiVersions data between controllers is not reliable

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775299#comment-17775299
 ] 

Stanislav Kozlovski commented on KAFKA-15230:
-

Did it release with 3.6.0?

 

> ApiVersions data between controllers is not reliable
> 
>
> Key: KAFKA-15230
> URL: https://issues.apache.org/jira/browse/KAFKA-15230
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: Colin McCabe
>Priority: Critical
> Fix For: 3.7.0
>
>
> While testing ZK migrations, I noticed a case where the controller was not 
> starting the migration due to the missing ApiVersions data from other 
> controllers. This was unexpected because the quorum was running and the 
> followers were replicating the metadata log as expected. After examining a 
> heap dump of the leader, it was in fact the case that the ApiVersions map of 
> NodeApiVersions was empty.
>  
> After further investigation and offline discussion with [~jsancio], we 
> realized that after the initial leader election, the connection from the Raft 
> leader to the followers will become idle and eventually timeout and close. 
> This causes NetworkClient to purge the NodeApiVersions data for the closed 
> connections.
>  
> There are two main side effects of this behavior: 
> 1) If migrations are not started within the idle timeout period (10 minutes, 
> by default), then they will not be able to be started. After this timeout 
> period, I was unable to restart the controllers in such a way that the leader 
> had active connections with all followers.
> 2) Dynamically updating features, such as "metadata.version", is not 
> guaranteed to be safe
>  
> There is a partial workaround for the migration issue. If we set "
> connections.max.idle.ms" to -1, the Raft leader will never disconnect from 
> the followers. However, if a follower restarts, the leader will not 
> re-establish a connection.
>  
> The feature update issue has no safe workarounds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10892) Add Topology#connectSourceStoreAndTopic as a public method

2023-10-14 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-10892:

Fix Version/s: (was: 3.7.0)

> Add Topology#connectSourceStoreAndTopic as a public method
> --
>
> Key: KAFKA-10892
> URL: https://issues.apache.org/jira/browse/KAFKA-10892
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Tomohiro Hashidate
>Assignee: Daan Gerits
>Priority: Major
>  Labels: kip
>
> I want Topology#connectSourceStoreAndTopic.
> Because I want to use a topic as a source topic directly without a redundant 
> changelog topic for not only KeyValueStore but also WindowStore.
> This issue is similar to [KAFKA-6840], but is a suggestion for a more general 
> approach
> {code:java}
> public synchronized Topology connectSourceStoreAndTopic(final String 
> sourceStoreName,
> final String 
> topic) {
> internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, 
> topic);
> return this;
> }
> {code}
> h3. Background
> I want to use a topic as a source topic for WindowStore because using 
> WindowStore is suitable for the feature that I'm implementing.
>  The records stored in the topic are aggregated with a time window by another 
> application. The size of the topic is over 10TB.
>  I want to use the topic as a source topic for WindowStore directly.
>  But, I cannot do so on the current interface. 
>  I need a redundant topic only for storing the records into WindowStore.
> If this API is public, I can use topics incoming from other applications (not 
> only Kafka Streams applications) as source topics for any StateStore 
> implementation without redundant changelog topics.
> Of course, I need to implement a processor for storing incoming records in 
> such a case.
>  But I think it's not difficult.
> Please consider this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10892) Add Topology#connectSourceStoreAndTopic as a public method

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775298#comment-17775298
 ] 

Stanislav Kozlovski commented on KAFKA-10892:
-

Given the lack of activity on [https://github.com/apache/kafka/pull/12742,] I 
won't include this in the initial AK 3.7 release plan and I'm removing the tag 
for now. Please let me know if you will target / believe we can get this done 
for 3.7 [~calmera] and we can re-add. Until then, let's keep it without a 
target release until we have high confidence we will hit it. Otherwise we end 
up bumping it on every release.

 

> Add Topology#connectSourceStoreAndTopic as a public method
> --
>
> Key: KAFKA-10892
> URL: https://issues.apache.org/jira/browse/KAFKA-10892
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Tomohiro Hashidate
>Assignee: Daan Gerits
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> I want Topology#connectSourceStoreAndTopic.
> Because I want to use a topic as a source topic directly without a redundant 
> changelog topic for not only KeyValueStore but also WindowStore.
> This issue is similar to [KAFKA-6840], but is a suggestion for a more general 
> approach
> {code:java}
> public synchronized Topology connectSourceStoreAndTopic(final String 
> sourceStoreName,
> final String 
> topic) {
> internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, 
> topic);
> return this;
> }
> {code}
> h3. Background
> I want to use a topic as a source topic for WindowStore because using 
> WindowStore is suitable for the feature that I'm implementing.
>  The records stored in the topic are aggregated with a time window by another 
> application. The size of the topic is over 10TB.
>  I want to use the topic as a source topic for WindowStore directly.
>  But, I cannot do so on the current interface. 
>  I need a redundant topic only for storing the records into WindowStore.
> If this API is public, I can use topics incoming from other applications (not 
> only Kafka Streams applications) as source topics for any StateStore 
> implementation without redundant changelog topics.
> Of course, I need to implement a processor for storing incoming records in 
> such a case.
>  But I think it's not difficult.
> Please consider this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775296#comment-17775296
 ] 

Stanislav Kozlovski commented on KAFKA-12473:
-

[~dajac] [~showuon] shall we update the KIP in that case?

 

> Make the "cooperative-sticky, range" as the default assignor
> 
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: kip
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15147) Measure pending and outstanding Remote Segment operations

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775295#comment-17775295
 ] 

Stanislav Kozlovski commented on KAFKA-15147:
-

[~divijvaidya] given that there is no kip, is it realistic to still target AK 
3.7? Perhaps we should remove the Fix Version until we have confidence in the 
release, otherwise it just ends up getting bumped each release (as we've seen 
in many other unrelated jiras)

 

> Measure pending and outstanding Remote Segment operations
> -
>
> Key: KAFKA-15147
> URL: https://issues.apache.org/jira/browse/KAFKA-15147
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
> Remote Log Segment operations (copy/delete) are executed by the Remote 
> Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default 
> TopicBasedRLMM writes to the internal Kafka topic state changes on remote log 
> segments).
> As executions run, fail, and retry; it will be important to know how many 
> operations are pending and outstanding over time to alert operators.
> Pending operations are not enough to alert, as values can oscillate closer to 
> zero. An additional condition needs to apply (running time > threshold) to 
> consider an operation outstanding.
> Proposal:
> RemoteLogManager could be extended with 2 concurrent maps 
> (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure 
> segmentId time when operation started, and based on this expose 2 metrics per 
> operation:
>  * pendingSegmentCopies: gauge of pendingSegmentCopies map
>  * outstandingSegmentCopies: loop over pending ops, and if now - startedTime 
> > timeout, then outstanding++ (maybe on debug level?)
> Is this a valuable metric to add to Tiered Storage? or better to solve on a 
> custom RLMM implementation?
> Also, does it require a KIP?
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14175) KRaft Upgrades Part 2

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775294#comment-17775294
 ] 

Stanislav Kozlovski commented on KAFKA-14175:
-

given the lack of activity in the last few releases, do we expect this to land 
in 3.7? It would be good to avoid having the Fix Version until we have 
confidence in it, as we just seem to bump it each release

 

> KRaft Upgrades Part 2
> -
>
> Key: KAFKA-14175
> URL: https://issues.apache.org/jira/browse/KAFKA-14175
> Project: Kafka
>  Issue Type: New Feature
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.7.0
>
>
> This is the parent issue for KIP-778 tasks which were not completed for the 
> 3.3 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775293#comment-17775293
 ] 

Stanislav Kozlovski commented on KAFKA-15594:
-

[~satish.duggana] is anyone going to be assigned to this? Did we release 3.6 
without it?

 

> Add 3.6.0 to streams upgrade/compatibility tests
> 
>
> Key: KAFKA-15594
> URL: https://issues.apache.org/jira/browse/KAFKA-15594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12399) Deprecate Log4J Appender KIP-719

2023-10-14 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-12399:

Fix Version/s: (was: 3.7.0)

> Deprecate Log4J Appender KIP-719
> 
>
> Key: KAFKA-12399
> URL: https://issues.apache.org/jira/browse/KAFKA-12399
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>  Labels: needs-kip
>
> As a following job of KAFKA-9366, we have to entirely remove the log4j 1.2.7 
> dependency from the classpath by removing dependencies on log4j-appender.
> KIP-719: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-719%3A+Deprecate+Log4J+Appender



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12399) Deprecate Log4J Appender KIP-719

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775292#comment-17775292
 ] 

Stanislav Kozlovski commented on KAFKA-12399:
-

This was last reviewed on March 7 and there has been no progress since, as far 
as I can tell. I will remove the target release version to avoid having to bump 
it each release (like we did in the last 3 releases). Please feel free to put 
it back if we are actively targeting any specific release

 

> Deprecate Log4J Appender KIP-719
> 
>
> Key: KAFKA-12399
> URL: https://issues.apache.org/jira/browse/KAFKA-12399
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.7.0
>
>
> As a following job of KAFKA-9366, we have to entirely remove the log4j 1.2.7 
> dependency from the classpath by removing dependencies on log4j-appender.
> KIP-719: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-719%3A+Deprecate+Log4J+Appender



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775291#comment-17775291
 ] 

Stanislav Kozlovski commented on KAFKA-13152:
-

Given there hasn't been much movement in this PR since February 2023, are we 
targeting 3.7 as it's currently marked?

[~sagarrao] as far as I understand, you're blocked on PR reviews - is that 
correct?

 

> Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with 
> "{statestore.cache}/{input.buffer}.max.bytes"
> -
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15593) Add 3.6.0 to broker/client upgrade/compatibility tests

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775286#comment-17775286
 ] 

Stanislav Kozlovski commented on KAFKA-15593:
-

[~satish.duggana] is this good to close? seems weird to target 3.7 with 3.6 
compatibility tests. AFAICT, the PRs are merged

 

> Add 3.6.0 to broker/client upgrade/compatibility tests
> --
>
> Key: KAFKA-15593
> URL: https://issues.apache.org/jira/browse/KAFKA-15593
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14077) KRaft should support recovery from failed disk

2023-10-14 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-14077:

Fix Version/s: (was: 3.7.0)

> KRaft should support recovery from failed disk
> --
>
> Key: KAFKA-14077
> URL: https://issues.apache.org/jira/browse/KAFKA-14077
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 2.8.0
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
>
> If one of the nodes in the metadata quorum has a disk failure, there is no 
> way currently to safely bring the node back into the quorum. When we lose 
> disk state, we are at risk of losing committed data even if the failure only 
> affects a minority of the cluster.
> Here is an example. Suppose that a metadata quorum has 3 members: v1, v2, and 
> v3. Initially, v1 is the leader and writes a record at offset 1. After v2 
> acknowledges replication of the record, it becomes committed. Suppose that v1 
> fails before v3 has a chance to replicate this record. As long as v1 remains 
> down, the raft protocol guarantees that only v2 can become leader, so the 
> record cannot be lost. The raft protocol expects that when v1 returns, it 
> will still have that record, but what if there is a disk failure, the state 
> cannot be recovered and v1 participates in leader election? Then we would 
> have committed data on a minority of the voters. The main problem here 
> concerns how we recover from this impaired state without risking the loss of 
> this data.
> Consider a naive solution which brings v1 back with an empty disk. Since the 
> node has lost is prior knowledge of the state of the quorum, it will vote for 
> any candidate that comes along. If v3 becomes a candidate, then it will vote 
> for itself and it just needs the vote from v1 to become leader. If that 
> happens, then the committed data on v2 will become lost.
> This is just one scenario. In general, the invariants that the raft protocol 
> is designed to preserve go out the window when disk state is lost. For 
> example, it is also possible to contrive a scenario where the loss of disk 
> state leads to multiple leaders. There is a good reason why raft requires 
> that any vote cast by a voter is written to disk since otherwise the voter 
> may vote for different candidates in the same epoch.
> Many systems solve this problem with a unique identifier which is generated 
> automatically and stored on disk. This identifier is then committed to the 
> raft log. If a disk changes, we would see a new identifier and we can prevent 
> the node from breaking raft invariants. Then recovery from a failed disk 
> requires a quorum reconfiguration. We need something like this in KRaft to 
> make disk recovery possible.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14077) KRaft should support recovery from failed disk

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775285#comment-17775285
 ] 

Stanislav Kozlovski commented on KAFKA-14077:
-

I asked [~jagsancio] and [~hachikuji] for what we should target. As there 
hasn't been much movement non this KIP from what I can tell in the discussion 
thread since 2022, I will remove the Fix Version to avoid having to bump it 
every release

 

> KRaft should support recovery from failed disk
> --
>
> Key: KAFKA-14077
> URL: https://issues.apache.org/jira/browse/KAFKA-14077
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 2.8.0
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
> Fix For: 3.7.0
>
>
> If one of the nodes in the metadata quorum has a disk failure, there is no 
> way currently to safely bring the node back into the quorum. When we lose 
> disk state, we are at risk of losing committed data even if the failure only 
> affects a minority of the cluster.
> Here is an example. Suppose that a metadata quorum has 3 members: v1, v2, and 
> v3. Initially, v1 is the leader and writes a record at offset 1. After v2 
> acknowledges replication of the record, it becomes committed. Suppose that v1 
> fails before v3 has a chance to replicate this record. As long as v1 remains 
> down, the raft protocol guarantees that only v2 can become leader, so the 
> record cannot be lost. The raft protocol expects that when v1 returns, it 
> will still have that record, but what if there is a disk failure, the state 
> cannot be recovered and v1 participates in leader election? Then we would 
> have committed data on a minority of the voters. The main problem here 
> concerns how we recover from this impaired state without risking the loss of 
> this data.
> Consider a naive solution which brings v1 back with an empty disk. Since the 
> node has lost is prior knowledge of the state of the quorum, it will vote for 
> any candidate that comes along. If v3 becomes a candidate, then it will vote 
> for itself and it just needs the vote from v1 to become leader. If that 
> happens, then the committed data on v2 will become lost.
> This is just one scenario. In general, the invariants that the raft protocol 
> is designed to preserve go out the window when disk state is lost. For 
> example, it is also possible to contrive a scenario where the loss of disk 
> state leads to multiple leaders. There is a good reason why raft requires 
> that any vote cast by a voter is written to disk since otherwise the voter 
> may vote for different candidates in the same epoch.
> Many systems solve this problem with a unique identifier which is generated 
> automatically and stored on disk. This identifier is then committed to the 
> raft log. If a disk changes, we would see a new identifier and we can prevent 
> the node from breaking raft invariants. Then recovery from a failed disk 
> requires a quorum reconfiguration. We need something like this in KRaft to 
> make disk recovery possible.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12886) Enable request forwarding by default

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775284#comment-17775284
 ] 

Stanislav Kozlovski commented on KAFKA-12886:
-

[~rdielhenn] is this targetting 3.7? I wonder if we should target a release at 
all since it seems like there hasn't been work on this PR for 2 years.

I will remove the target Fix Version given that. Please let me know if I'm wrong

 

> Enable request forwarding by default
> 
>
> Key: KAFKA-12886
> URL: https://issues.apache.org/jira/browse/KAFKA-12886
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.7.0
>
>
> KIP-590 documents that request forwarding will be enabled in 3.0 by default: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller.
>  This makes it a requirement for users with custom principal implementations 
> to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 
> because we saw this as a compatibility break. 
> The KIP documents that use of forwarding will be controlled by the IBP. So 
> once the IBP has been configured to 3.0 or above, then the brokers will begin 
> forwarding.
> (Note that forwarding has always been a requirement for kraft.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12886) Enable request forwarding by default

2023-10-14 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-12886:

Fix Version/s: (was: 3.7.0)

> Enable request forwarding by default
> 
>
> Key: KAFKA-12886
> URL: https://issues.apache.org/jira/browse/KAFKA-12886
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Blocker
>
> KIP-590 documents that request forwarding will be enabled in 3.0 by default: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller.
>  This makes it a requirement for users with custom principal implementations 
> to provide a `KafkaPrincipalSerde` implementation. We waited until 3.0 
> because we saw this as a compatibility break. 
> The KIP documents that use of forwarding will be controlled by the IBP. So 
> once the IBP has been configured to 3.0 or above, then the brokers will begin 
> forwarding.
> (Note that forwarding has always been a requirement for kraft.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15406) Add the ForwardingManager metrics from KIP-938

2023-10-14 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775275#comment-17775275
 ] 

Stanislav Kozlovski commented on KAFKA-15406:
-

[~cmccabe] given that we are targetting this for 3.7 as per your comment in 
KAFKA-15183, should we set the Fix Version to 3.7? I ack we usually seem to set 
this when we merge, but the release plan doc has a filter that only looks at 
Fix Version - so it's a useful way to know what'll be part of the release

 

> Add the ForwardingManager metrics from KIP-938
> --
>
> Key: KAFKA-15406
> URL: https://issues.apache.org/jira/browse/KAFKA-15406
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: newbie, newbie++
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: cleanup warnings in Kafka Streams code base [kafka]

2023-10-14 Thread via GitHub


mjsax opened a new pull request, #14549:
URL: https://github.com/apache/kafka/pull/14549

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] POC - DO NO MERGE: refactor StreamsConfig [kafka]

2023-10-14 Thread via GitHub


mjsax commented on code in PR #14548:
URL: https://github.com/apache/kafka/pull/14548#discussion_r1359477497


##
build.gradle:
##
@@ -2151,7 +2151,7 @@ project(':streams') {
 
   task genStreamsConfigDocs(type: JavaExec) {
 classpath = sourceSets.main.runtimeClasspath
-mainClass = 'org.apache.kafka.streams.StreamsConfig'
+mainClass = 'org.apache.kafka.streams.internals.InternalStreamsConfig'

Review Comment:
   We use `main` to generate the config HTML for the webpage, so update to use 
internal one going forward



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -843,7 +843,7 @@ public KafkaStreams(final Topology topology,
 public KafkaStreams(final Topology topology,
 final StreamsConfig applicationConfigs,
 final KafkaClientSupplier clientSupplier) {
-this(new TopologyMetadata(topology.internalTopologyBuilder, 
applicationConfigs), applicationConfigs, clientSupplier);
+this(topology, new InternalStreamsConfig(applicationConfigs), 
clientSupplier, Time.SYSTEM);

Review Comment:
   Some side cleanup on constructor methods -- no public change -- just to make 
the flow simpler



##
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java:
##
@@ -1089,34 +1091,30 @@ public void 
shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
 
 @Test
 public void shouldGetClientSupplierFromConfigForConstructor() {

Review Comment:
   This and the next test needed an update, because the refactoring break the 
mocking -- Cf the change in `MockClientSupplier`, too.



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -1171,72 +1170,6 @@ public class StreamsConfig extends AbstractConfig {
 CONSUMER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
 }
 
-public static class InternalConfig {

Review Comment:
   Even is this class it's public, it's called `InternalConfig` and thus 
clearly internal -- I believe it's ok to move to new sub-class without 
deprecation (as a matter of fact, we won't need this helper class any longer, 
and I only moved the members of this class to `InternalStreamsConfig`.



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -790,7 +790,7 @@ public KafkaStreams(final Topology topology,
 public KafkaStreams(final Topology topology,
 final Properties props,
 final Time time) {
-this(topology, new StreamsConfig(props), time);
+this(topology, new InternalStreamsConfig(props), null, time);

Review Comment:
   Some side cleanup on constructor methods -- no public change -- just to make 
the flow simpler



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -860,32 +860,37 @@ public KafkaStreams(final Topology topology,
 public KafkaStreams(final Topology topology,
 final StreamsConfig applicationConfigs,
 final Time time) {
-this(new TopologyMetadata(topology.internalTopologyBuilder, 
applicationConfigs), applicationConfigs, 
applicationConfigs.getKafkaClientSupplier(), time);
+this(topology, new InternalStreamsConfig(applicationConfigs), null, 
time);

Review Comment:
   Some side cleanup on constructor methods -- no public change -- just to make 
the flow simpler



##
streams/src/main/java/org/apache/kafka/streams/internals/InternalStreamsConfig.java:
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.proces

[PR] POC - DO NO MERGE: refactor StreamsConfig [kafka]

2023-10-14 Thread via GitHub


mjsax opened a new pull request, #14548:
URL: https://github.com/apache/kafka/pull/14548

   This is a POC to refactor `StreamsConfig` to move "leaking" internal methods 
into a newly added subclass `InternalStreamsConfig`, following an already 
established pattern using for DSL config objects (like `Consumed`, `Produced`, 
`Materialized` etc). This change requires a KIP.
   
   We would deprecate the following methods on `StreamsConfig` to be able to 
remove them w/o replacement, as we should consider them internal:
   - `confgDef`
   - `getMainConsumerConfig`
   - `getRestoreConsumeConfig`
   - `getGlobalConsumerConfig`
   - `getProducerConfig`
   - `getAdminConfig`
   - `getClientTag`
   - `getKafkaClientSupplier`
   - `defaultKeySerde`
   - `defaultValueSerde`
   - `defaultTimestampExtractor`
   - `defaultDeserializationHandler`
   - `defaultProductionExceptionHandler`
   - `main`
   
   In addition, this PR changes all internal usage from `StreamsConfig` to 
`InternalStreamsConfig`.
   
   If we believe this is a reasonable change, I can prepare a KIP for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15607) Possible NPE is thrown in MirrorCheckpointTask

2023-10-14 Thread hudeqi (Jira)
hudeqi created KAFKA-15607:
--

 Summary: Possible NPE is thrown in MirrorCheckpointTask
 Key: KAFKA-15607
 URL: https://issues.apache.org/jira/browse/KAFKA-15607
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.8.1
Reporter: hudeqi
Assignee: hudeqi


In the `syncGroupOffset` method, if `targetConsumerOffset.get(topicPartition)` 
gets null, then the calculation of `latestDownstreamOffset` will throw NPE. 
This usually occurs in this situation: a group consumed a topic in the target 
cluster previously. Later, the group offset of some partitions was reset to -1, 
the `OffsetAndMetadata` of these partitions was null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-14 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460675


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entries with limited capacity of 2 entries -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)
+
+// Increase cache capacity to only store 2 entries
+cache.resizeCacheSize(2 * estimateEntryBytesSize)
+assertCacheSize(0)
+
+val entry0 = cache.getIndexEntry(metadataList(0))
+val entry1 = cache.getIndexEntry(metadataList(1))
+cache.getIndexEntry(metadataList(2))
+assertCacheSize(2)
+val missingMetadata = metadataList(0)
+val missingEntry = entry0
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup,
+  "Failed to mark evicted cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted,
+  "Failed to cleanup evicted cache entry after resizing cache.")
+// verify no index files for `missingEntry` on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent,
+  s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent,
+  s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent,
+  s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent,
+  s"Index file marked for deletion for evicted entry should not be present 
on disk at ${cache.cacheDir()}")
+
+// Reduce cache capacity to only store 1 entries
+cache.resizeCacheSize(1 * estimateEntryBytesSize)
+assertCacheSize(1)
+
+val nextMissingMetadata = metadataList(1)
+val nextMissingEntry = entry1
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup,
+  "Failed to mark evict

Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-14 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460627


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entries with limited capacity of 2 entries -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)
+
+// Increase cache capacity to only store 2 entries
+cache.resizeCacheSize(2 * estimateEntryBytesSize)
+assertCacheSize(0)
+
+val entry0 = cache.getIndexEntry(metadataList(0))
+val entry1 = cache.getIndexEntry(metadataList(1))
+cache.getIndexEntry(metadataList(2))
+assertCacheSize(2)
+val missingMetadata = metadataList(0)
+val missingEntry = entry0
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup,
+  "Failed to mark evicted cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted,
+  "Failed to cleanup evicted cache entry after resizing cache.")
+// verify no index files for `missingEntry` on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent,
+  s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent,
+  s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent,
+  s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent,
+  s"Index file marked for deletion for evicted entry should not be present 
on disk at ${cache.cacheDir()}")
+
+// Reduce cache capacity to only store 1 entries
+cache.resizeCacheSize(1 * estimateEntryBytesSize)
+assertCacheSize(1)
+
+val nextMissingMetadata = metadataList(1)
+val nextMissingEntry = entry1
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup,
+  "Failed to mark evict

Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-14 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460606


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entries with limited capacity of 2 entries -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-14 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460521


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-14 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1359435180


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -525,7 +525,7 @@ class RemoteIndexCacheTest {
   .filter(path => path.getFileName.toString.endsWith(suffix))
   .findAny()
   } catch {
-case _: FileNotFoundException => Optional.empty()
+case _: NoSuchFileException => Optional.empty()

Review Comment:
   @showuon Ah, I think the `FileNotFoundException` added here may not be the 
correct exception thrown. This is what I discovered when merging the latest 
trunk:
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14511/4/tests
   
   错误
   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
/tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted
   栈跟踪
   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
/tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted
at 
java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88)
at java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104)
at 
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
at 
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)



##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -525,7 +525,7 @@ class RemoteIndexCacheTest {
   .filter(path => path.getFileName.toString.endsWith(suffix))
   .findAny()
   } catch {
-case _: FileNotFoundException => Optional.empty()
+case _: NoSuchFileException => Optional.empty()

Review Comment:
   @showuon Ah, I think the `FileNotFoundException` added here may not be the 
correct exception thrown. This is what I discovered when merging the latest 
trunk:
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14511/4/tests
   
   错误
   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
/tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted
   栈跟踪
   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
/tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted
at 
java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88)
at java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104)
at 
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
at 
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15481: Fix concurrency bug in RemoteIndexCache [kafka]

2023-10-14 Thread via GitHub


iit2009060 commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1359185319


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -592,16 +593,75 @@ class RemoteIndexCacheTest {
 verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+// Create a spy Cache Entry
+val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+  time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+
+val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+assertCacheSize(1)
+
+var entry: RemoteIndexCache.Entry = null
+
+val latchForCacheRead = new CountDownLatch(1)
+val latchForCacheRemove = new CountDownLatch(1)
+val latchForTestWait = new CountDownLatch(1)
+
+doAnswer((invocation: InvocationOnMock) => {
+  // Signal the CacheRead to unblock itself
+  latchForCacheRead.countDown()
+  // Wait for signal to start renaming the files
+  latchForCacheRemove.await()
+  // Calling the markForCleanup() actual method to start renaming the files
+  invocation.callRealMethod()

Review Comment:
   Why we are calling actual rename of the file in callRealMethod ? 
   Correct me if my understanding is wrong here ? 
   Two threads are defined 
   1. removalCache - executing remove function of the cache. 
   2. ReadCache - for reading data from the cache  when spy.markforEntryCleanUp 
is executed. 
   =
   
   Operations
   1.  removeCache triggered 
   2. spyEntry.markForCleanup when executed ( the files are already renamed to 
.delete)
   3. readCache executed and finished because no lock is pending on the remove 
operation
   4. It creates new file in the entry again(fetched from remote storage, 
rather than existed in the cache)(We should validate the number of  calls to 
rsm here )
   5.After latchfoCacheRemove.await() ,why we are  explicitlly calling 
markCleanup again ? 
   



##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -193,7 +192,16 @@ public File cacheDir() {
 public void remove(Uuid key) {
 lock.readLock().lock();
 try {
-internalCache.invalidate(key);
+internalCache.asMap().computeIfPresent(key, (k, v) -> {
+try {
+v.markForCleanup();
+expiredIndexes.put(v);

Review Comment:
   @showuon  I was just thinking around it , if this fails , it will create a 
dump of files with delete suffix entry which never gets deleted from the disk ? 
Is the behaviour ok ?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -193,7 +192,16 @@ public File cacheDir() {
 public void remove(Uuid key) {
 lock.readLock().lock();
 try {
-internalCache.invalidate(key);
+internalCache.asMap().computeIfPresent(key, (k, v) -> {
+try {
+v.markForCleanup();
+expiredIndexes.put(v);

Review Comment:
   @showuon @kamalcph  @jeel2420  I think we are entering into a deadlock state 
here 
   During remove we try to take a readLock  but in markForCleanUp  we try to 
take a write lock , Will it not result a deadlock state ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15387: Deprecate Connect's redundant task configurations endpoint [kafka]

2023-10-14 Thread via GitHub


yashmayya commented on PR #14361:
URL: https://github.com/apache/kafka/pull/14361#issuecomment-1762750265

   Test failures are unrelated, merging to `trunk`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15387: Deprecate Connect's redundant task configurations endpoint [kafka]

2023-10-14 Thread via GitHub


yashmayya merged PR #14361:
URL: https://github.com/apache/kafka/pull/14361


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org