Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-03-09 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1518786653


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##
@@ -35,6 +38,8 @@
   "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
 { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null", "entityType": "topicName",
   "about": "null if it didn't change since the last heartbeat; the 
subscribed topic names otherwise." },
+{ "name": "SubscribedTopicRegex", "type": "string", "versions": "1+", 
"nullableVersions": "1+", "default": "null",
+  "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },

Review Comment:
   Will change this to a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]

2024-03-09 Thread via GitHub


Phuc-Hong-Tran commented on PR #14327:
URL: https://github.com/apache/kafka/pull/14327#issuecomment-1987118580

   hi @JimmyWang6, are you still working on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16345) Optionally allow urlencoding clientId and clientSecret in authorization header

2024-03-09 Thread Nelson B. (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson B. updated KAFKA-16345:
--
Description: 
When a client communicates with OIDC provider to retrieve an access token 
RFC-6749 says that clientID and clientSecret must be urlencoded in the 
authorization header. (see [https://tools.ietf.org/html/rfc6749#section-2.3.1)] 
However, it seems that in practice some OIDC providers do not enforce this, so 
I was thinking about introducing a new configuration parameter that will 
optionally urlencode clientId & clientSecret in the authorization header. 

 

Link to the KIP 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1025%3A+Optionally+URL-encode+clientID+and+clientSecret+in+authorization+header

  was:When a client communicates with OIDC provider to retrieve an access token 
RFC-6749 says that clientID and clientSecret must be urlencoded in the 
authorization header. (see [https://tools.ietf.org/html/rfc6749#section-2.3.1)] 
However, it seems that in practice some OIDC providers do not enforce this, so 
I was thinking about introducing a new configuration parameter that will 
optionally urlencode clientId & clientSecret in the authorization header. 


> Optionally allow urlencoding clientId and clientSecret in authorization header
> --
>
> Key: KAFKA-16345
> URL: https://issues.apache.org/jira/browse/KAFKA-16345
> Project: Kafka
>  Issue Type: Bug
>Reporter: Nelson B.
>Assignee: Nelson B.
>Priority: Minor
>
> When a client communicates with OIDC provider to retrieve an access token 
> RFC-6749 says that clientID and clientSecret must be urlencoded in the 
> authorization header. (see 
> [https://tools.ietf.org/html/rfc6749#section-2.3.1)] However, it seems that 
> in practice some OIDC providers do not enforce this, so I was thinking about 
> introducing a new configuration parameter that will optionally urlencode 
> clientId & clientSecret in the authorization header. 
>  
> Link to the KIP 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1025%3A+Optionally+URL-encode+clientID+and+clientSecret+in+authorization+header



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-09 Thread via GitHub


ChrisAHolland opened a new pull request, #15507:
URL: https://github.com/apache/kafka/pull/15507

   I noticed that were were a few implementation quirks with the implementation 
of `BoundedList.java` that could potentially lead to bugs if used improperly.
   
   1. A constructor like method with signature `newArrayBacked(int maxLength, 
int initialCapacity)` took in two integers, one for the maximum length of the 
`BoundedList`, and a second for the initial capacity of the underlying 
`ArrayList`. This seems like a flaw to me because when ArrayLists resize it is 
a O(n) operation, so if the initial capacity was "under-provisioned" compared 
to the maximum capacity, many resizing operations could take place as this list 
grows, which is not good for performance.
   
   Fix: Remove `newArrayBacked(int maxLength, int initialCapacity)` and use the 
new constructor instead (mentioned below).
   
   2. The public constructor had a signature of `public BoundedList(int 
maxLength, List underlying)`. This is problematic because the the 
constructor created the `BoundedList` using a reference to `underlying`, not a 
copy of it; therefore, a user could add elements to `underlying` instead of 
their newly instantiated `BoundedList` and force the `BoundedList` to be larger 
than its maxLength`.
   
   Fix: Change the constructor to have signature `public BoundedList(int 
maxLength)`. I noticed that passing in a "original list" was only used in unit 
tests, never in the source code, but I felt that made the unit tests confusing. 
If this use case is ever needed, it can be added in again later on.
   
   3. Remaining changes involve updating the usages of `BoundedList` and 
cleaning up the unit tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove the copy constructor of LogSegment [kafka]

2024-03-09 Thread via GitHub


chia7712 commented on code in PR #15488:
URL: https://github.com/apache/kafka/pull/15488#discussion_r1518695740


##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -352,19 +352,15 @@ class LogLoaderTest {
   // Intercept all segment read calls
   val interceptedLogSegments = new LogSegments(topicPartition) {
 override def add(segment: LogSegment): LogSegment = {
-  val wrapper = new LogSegment(segment) {
-
-override def read(startOffset: Long, maxSize: Int, maxPosition: 
Long, minOneMessage: Boolean): FetchDataInfo = {
-  segmentsWithReads += this
-  super.read(startOffset, maxSize, maxPosition, minOneMessage)
-}
-
-override def recover(producerStateManager: ProducerStateManager,
- leaderEpochCache: 
Optional[LeaderEpochFileCache]): Int = {
-  recoveredSegments += this
-  super.recover(producerStateManager, leaderEpochCache)
-}
-  }
+  val wrapper = Mockito.spy(segment)

Review Comment:
   > There is a lot of magic to make mocks work and even more so for spy mocks. 
Replacing readable/debuggable normal code for mocks is generally a bad idea. A 
concrete example or the problems with mocks are the mock leaks that have caused 
serious test instability.
   As a general rule, mocks should only be used if the mock free alternative is 
clearly worse in terms of verbosity, maintainability, etc. I don't think it's 
the case here.
   
   Thanks for sharing. I agree that `spy` is a black box and it could have 
caused serious test instability.
   
   get back to the subject. There are alternatives which can remove the 
incomplete copy constructor.
   
   1. add public getters to `LogSegment` in order to reuse the constructor 
(https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java#L112).
 The side-effect is that the testing-only code is added. However, that is 
better than having an incomplete copy constructor.
   2. reuse the constructor with temporary objects. The is the initial approach 
of this PR. The disadvantage is that it needs to create weird objects, but it 
is fine for the tests.
   
   I prefer option 2 to replace the `spy`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit [kafka]

2024-03-09 Thread via GitHub


chia7712 commented on code in PR #15289:
URL: https://github.com/apache/kafka/pull/15289#discussion_r1518692934


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -195,6 +195,25 @@ public static File tempDirectory() {
 return tempDirectory(null);
 }
 
+/**
+ * Create a temporary directory under the given root directory.
+ * The root directory is removed on JVM exit if it doesn't already exist
+ * when this function is invoked.
+ *
+ * @param root path to create temporary directory under
+ * @return
+ */
+public static File tempRelativeDir(String root) {
+File rootFile = new File(root);
+boolean created = rootFile.mkdir();
+
+File result = tempDirectory(rootFile.toPath(), null);
+if (created) {
+rootFile.deleteOnExit();

Review Comment:
   > Have you verified that the order of these independent removals is correct 
though? 
   
   I had run the tests with this PR, and the folder created by this helper gets 
removed as expected. see 
https://github.com/apache/kafka/pull/15289#pullrequestreview-1923156750
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1518681146


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords(
 sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
-break;
+final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (timestampedKeyAndJoinSide.isLeftSide()) {
+outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
+} else {
+outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
+}
+if (outerJoinLeftBreak && outerJoinRightBreak) {

Review Comment:
   Hi,
   
   Seems like `outerJoinLeftBreak && outerJoinRightBreak` is always false.
   Doesn't this break the behavior described in the comment on top of this 
block? `// Skip next records if window has not closed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove the copy constructor of LogSegment [kafka]

2024-03-09 Thread via GitHub


ijuma commented on code in PR #15488:
URL: https://github.com/apache/kafka/pull/15488#discussion_r1518681734


##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -352,19 +352,15 @@ class LogLoaderTest {
   // Intercept all segment read calls
   val interceptedLogSegments = new LogSegments(topicPartition) {
 override def add(segment: LogSegment): LogSegment = {
-  val wrapper = new LogSegment(segment) {
-
-override def read(startOffset: Long, maxSize: Int, maxPosition: 
Long, minOneMessage: Boolean): FetchDataInfo = {
-  segmentsWithReads += this
-  super.read(startOffset, maxSize, maxPosition, minOneMessage)
-}
-
-override def recover(producerStateManager: ProducerStateManager,
- leaderEpochCache: 
Optional[LeaderEpochFileCache]): Int = {
-  recoveredSegments += this
-  super.recover(producerStateManager, leaderEpochCache)
-}
-  }
+  val wrapper = Mockito.spy(segment)

Review Comment:
   There is a lot of magic to make mocks work and even more so for spy mocks. 
Replacing readable/debuggable normal code for mocks is generally a bad idea. A 
concrete example or the problems with mocks are the mock leaks that have caused 
serious test instability.
   
   As a general rule, mocks should only be used if the mock free alternative is 
clearly worse in terms of verbosity, maintainability, etc. I don't think it's 
the case here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1518681146


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords(
 sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
-break;
+final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (timestampedKeyAndJoinSide.isLeftSide()) {
+outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
+} else {
+outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
+}
+if (outerJoinLeftBreak && outerJoinRightBreak) {

Review Comment:
   Hi,
   
   Seems like `outerJoinLeftBreak && outerJoinRightBreak` is always false.
   Doesn't this break the behaviro described in the comment on top of this 
block? `// Skip next records if window has not closed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit [kafka]

2024-03-09 Thread via GitHub


ijuma commented on code in PR #15289:
URL: https://github.com/apache/kafka/pull/15289#discussion_r1518680219


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -195,6 +195,25 @@ public static File tempDirectory() {
 return tempDirectory(null);
 }
 
+/**
+ * Create a temporary directory under the given root directory.
+ * The root directory is removed on JVM exit if it doesn't already exist
+ * when this function is invoked.
+ *
+ * @param root path to create temporary directory under
+ * @return
+ */
+public static File tempRelativeDir(String root) {
+File rootFile = new File(root);
+boolean created = rootFile.mkdir();
+
+File result = tempDirectory(rootFile.toPath(), null);
+if (created) {
+rootFile.deleteOnExit();

Review Comment:
   Have you verified that the order of these independent removals is correct 
though? I don't think there's any guarantee - that's my point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-1986999670

   >  So I would have expected that some test need an update with either 
advancing time pro-actively, or by expected certain result later in the test, 
because windows are closes later?
   
   Indeed good point.
   
   I think the problem lies with 
   
https://github.com/florin-akermann/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L247
   `outerJoinLeftBreak && outerJoinRightBreak` is always false. Hence, this 
loop never exits early?
   I think this behavior has been introduced as part of 
https://github.com/apache/kafka/pull/14426?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1518671144


##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   I wonder how to best solve this.
   Multiple null-key records would colide in the 'outerJoinStore' as they 
potentially all could have the same key of  type 'TimestampedKeyAndJoinSide'.
   
   E.g. for  a left stream
   
   | Key  | value | ts |
   |--|---||
   | null | a | 1  |
   | null | b | 1  |
   | null | c | 1  |
   
   We probably would only get to see 'c' even though we would like to see 
'a','b' and 'c'?
   
   From the top of my head I see two options to handle this.
   - Maintain an additional store just for null-key records where such records 
wouldn't collide.
   - Adjust the 'outerJoinStore' key type `TimestampedKeyAndJoinSide'`( E.g. 
by adding offset and partition as optional fields. This way null-key records 
could be distinguished and for keyed records the old behavior can be kept)
   
   Personally I prefer the latter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1518667989


##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   Yes, good point. As is, the KStreamKStreamJoin forwards null-key records 
directly and not only after the window closes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1518667829


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -1901,6 +1903,66 @@ public void testAsymmetricWindowingBefore() {
 }
 }
 
+@Test
+public void recordsArrivingPostWindowCloseShouldBeDropped() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream joined = builder.stream(topic1, 
consumed).join(
+builder.stream(topic2, consumed),
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+joined.process(supplier);
+
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+final TestInputTopic left =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic right =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+left.pipeInput(0, "left", 15);
+right.pipeInput(-1, "bumpTime", 40);
+assertRecordDropCount(0.0, processor);
+
+right.pipeInput(0, "closesAt39", 24);

Review Comment:
   Thanks, ok, I adjuted the 'hint' in the value accordingly.
   I don't think we have off-by-one issue here: `[14;34 + 5]` so the record is 
considered 'too late' at t=40?
   In other words for this test case it was purely a misleading 'hint'?
   
   On a different note, I deleted the test case in `KStreamKStreamJoinTest` and 
refer to `KStreamKStreamWindowCloseTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16346: Fix flaky MetricsTest.testMetrics [kafka]

2024-03-09 Thread via GitHub


chia7712 commented on code in PR #15502:
URL: https://github.com/apache/kafka/pull/15502#discussion_r1518650689


##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -258,7 +258,11 @@ object RequestChannel extends Logging {
 m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
 m.totalTimeHist.update(Math.round(totalTimeMs))
 m.requestBytesHist.update(sizeOfBodyInBytes)
-
m.messageConversionsTimeHist.foreach(_.update(Math.round(messageConversionsTimeMs)))
+m.messageConversionsTimeHist.foreach(h => {
+  if (messageConversionsTimeMs > 0) {

Review Comment:
   This is a kind of behavior change. Could you add comments for it? Also, we 
should have unit test to verify it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit [kafka]

2024-03-09 Thread via GitHub


chia7712 commented on code in PR #15289:
URL: https://github.com/apache/kafka/pull/15289#discussion_r1518650602


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -195,6 +195,25 @@ public static File tempDirectory() {
 return tempDirectory(null);
 }
 
+/**
+ * Create a temporary directory under the given root directory.
+ * The root directory is removed on JVM exit if it doesn't already exist
+ * when this function is invoked.
+ *
+ * @param root path to create temporary directory under
+ * @return
+ */
+public static File tempRelativeDir(String root) {
+File rootFile = new File(root);
+boolean created = rootFile.mkdir();
+
+File result = tempDirectory(rootFile.toPath(), null);
+if (created) {
+rootFile.deleteOnExit();

Review Comment:
   > This is not enough to ensure the file is removed - it will only happen if 
the directory is empty.
   
   the sub-folder created by `tempDirectory(rootFile.toPath(), null)` will get 
removed so the `rootFile` should be empty and it is removable when exit. If 
callers want to create another file under passed `root` manually, they should 
not use this helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Reduce memory allocation in ClientTelemetryReporter.java [kafka]

2024-03-09 Thread via GitHub


ijuma commented on code in PR #15402:
URL: https://github.com/apache/kafka/pull/15402#discussion_r1518649604


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -336,33 +337,35 @@ public long timeToNextUpdate(long requestTimeoutMs) {
 */
 apiName = (localState == 
ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS) ? 
ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.name : ApiKeys.PUSH_TELEMETRY.name;
 timeMs = requestTimeoutMs;
-msg = String.format("the remaining wait time for the %s 
network API request, as specified by %s", apiName, 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG);
+msg = isTraceEnabled ? "" : String.format("the remaining 
wait time for the %s network API request, as specified by %s", apiName, 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG);
 break;
 case TERMINATING_PUSH_IN_PROGRESS:
 timeMs = Long.MAX_VALUE;
-msg = String.format("the terminating push is in progress, 
disabling telemetry for further requests");
+msg = isTraceEnabled ? "" : "the terminating push is in 
progress, disabling telemetry for further requests";
 break;
 case TERMINATING_PUSH_NEEDED:
 timeMs = 0;
-msg = String.format("the client should try to submit the 
final %s network API request ASAP before closing", ApiKeys.PUSH_TELEMETRY.name);
+msg = isTraceEnabled ? "" : String.format("the client 
should try to submit the final %s network API request ASAP before closing", 
ApiKeys.PUSH_TELEMETRY.name);
 break;
 case SUBSCRIPTION_NEEDED:
 case PUSH_NEEDED:
 apiName = (localState == 
ClientTelemetryState.SUBSCRIPTION_NEEDED) ? 
ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.name : ApiKeys.PUSH_TELEMETRY.name;
 long timeRemainingBeforeRequest = localLastRequestMs + 
localIntervalMs - nowMs;
 if (timeRemainingBeforeRequest <= 0) {
 timeMs = 0;
-msg = String.format("the wait time before submitting 
the next %s network API request has elapsed", apiName);
+msg = isTraceEnabled ? "" : String.format("the wait 
time before submitting the next %s network API request has elapsed", apiName);
 } else {
 timeMs = timeRemainingBeforeRequest;
-msg = String.format("the client will wait before 
submitting the next %s network API request", apiName);
+msg = isTraceEnabled ? "" : String.format("the client 
will wait before submitting the next %s network API request", apiName);
 }
 break;
 default:
 throw new IllegalStateException("Unknown telemetry state: 
" + localState);
 }
 
-log.trace("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);

Review Comment:
   More details here https://www.baeldung.com/java-string-pool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove the copy constructor of LogSegment [kafka]

2024-03-09 Thread via GitHub


chia7712 commented on code in PR #15488:
URL: https://github.com/apache/kafka/pull/15488#discussion_r1518649558


##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -352,19 +352,15 @@ class LogLoaderTest {
   // Intercept all segment read calls
   val interceptedLogSegments = new LogSegments(topicPartition) {
 override def add(segment: LogSegment): LogSegment = {
-  val wrapper = new LogSegment(segment) {
-
-override def read(startOffset: Long, maxSize: Int, maxPosition: 
Long, minOneMessage: Boolean): FetchDataInfo = {
-  segmentsWithReads += this
-  super.read(startOffset, maxSize, maxPosition, minOneMessage)
-}
-
-override def recover(producerStateManager: ProducerStateManager,
- leaderEpochCache: 
Optional[LeaderEpochFileCache]): Int = {
-  recoveredSegments += this
-  super.recover(producerStateManager, leaderEpochCache)
-}
-  }
+  val wrapper = Mockito.spy(segment)

Review Comment:
   > What is the purpose of this change?
   
   It seems to me there are two reason of removing the copy constructor.
   
   1. it does not copy all fields of `LogLoader`, so it is potential bug if we 
use it in the production (in the future)
   2. the copy constructor is used only in testing, so it should be fine to 
move the related behavior to testing code
   
   > Using a spy mock instead of a copy constructor is not better. 
   
   pardon me, could you please share the reason to me?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Reduce memory allocation in ClientTelemetryReporter.java [kafka]

2024-03-09 Thread via GitHub


ijuma commented on code in PR #15402:
URL: https://github.com/apache/kafka/pull/15402#discussion_r1518649290


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -336,33 +337,35 @@ public long timeToNextUpdate(long requestTimeoutMs) {
 */
 apiName = (localState == 
ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS) ? 
ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.name : ApiKeys.PUSH_TELEMETRY.name;
 timeMs = requestTimeoutMs;
-msg = String.format("the remaining wait time for the %s 
network API request, as specified by %s", apiName, 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG);
+msg = isTraceEnabled ? "" : String.format("the remaining 
wait time for the %s network API request, as specified by %s", apiName, 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG);
 break;
 case TERMINATING_PUSH_IN_PROGRESS:
 timeMs = Long.MAX_VALUE;
-msg = String.format("the terminating push is in progress, 
disabling telemetry for further requests");
+msg = isTraceEnabled ? "" : "the terminating push is in 
progress, disabling telemetry for further requests";
 break;
 case TERMINATING_PUSH_NEEDED:
 timeMs = 0;
-msg = String.format("the client should try to submit the 
final %s network API request ASAP before closing", ApiKeys.PUSH_TELEMETRY.name);
+msg = isTraceEnabled ? "" : String.format("the client 
should try to submit the final %s network API request ASAP before closing", 
ApiKeys.PUSH_TELEMETRY.name);
 break;
 case SUBSCRIPTION_NEEDED:
 case PUSH_NEEDED:
 apiName = (localState == 
ClientTelemetryState.SUBSCRIPTION_NEEDED) ? 
ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.name : ApiKeys.PUSH_TELEMETRY.name;
 long timeRemainingBeforeRequest = localLastRequestMs + 
localIntervalMs - nowMs;
 if (timeRemainingBeforeRequest <= 0) {
 timeMs = 0;
-msg = String.format("the wait time before submitting 
the next %s network API request has elapsed", apiName);
+msg = isTraceEnabled ? "" : String.format("the wait 
time before submitting the next %s network API request has elapsed", apiName);
 } else {
 timeMs = timeRemainingBeforeRequest;
-msg = String.format("the client will wait before 
submitting the next %s network API request", apiName);
+msg = isTraceEnabled ? "" : String.format("the client 
will wait before submitting the next %s network API request", apiName);
 }
 break;
 default:
 throw new IllegalStateException("Unknown telemetry state: 
" + localState);
 }
 
-log.trace("For telemetry state {}, returning the value {} ms; {}", 
localState, timeMs, msg);

Review Comment:
   Hmm, unlike what the PR says, there is no string generated if the log is 
disabled. The only allocation is the array for passing the parameters. No harm 
in this change, but I wanted to clarify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit [kafka]

2024-03-09 Thread via GitHub


ijuma commented on code in PR #15289:
URL: https://github.com/apache/kafka/pull/15289#discussion_r1518649065


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -195,6 +195,25 @@ public static File tempDirectory() {
 return tempDirectory(null);
 }
 
+/**
+ * Create a temporary directory under the given root directory.
+ * The root directory is removed on JVM exit if it doesn't already exist
+ * when this function is invoked.
+ *
+ * @param root path to create temporary directory under
+ * @return
+ */
+public static File tempRelativeDir(String root) {
+File rootFile = new File(root);
+boolean created = rootFile.mkdir();
+
+File result = tempDirectory(rootFile.toPath(), null);
+if (created) {
+rootFile.deleteOnExit();

Review Comment:
   This is not enough to ensure the file is removed - it will only happen if 
the directory is empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove the copy constructor of LogSegment [kafka]

2024-03-09 Thread via GitHub


ijuma commented on code in PR #15488:
URL: https://github.com/apache/kafka/pull/15488#discussion_r1518648733


##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -352,19 +352,15 @@ class LogLoaderTest {
   // Intercept all segment read calls
   val interceptedLogSegments = new LogSegments(topicPartition) {
 override def add(segment: LogSegment): LogSegment = {
-  val wrapper = new LogSegment(segment) {
-
-override def read(startOffset: Long, maxSize: Int, maxPosition: 
Long, minOneMessage: Boolean): FetchDataInfo = {
-  segmentsWithReads += this
-  super.read(startOffset, maxSize, maxPosition, minOneMessage)
-}
-
-override def recover(producerStateManager: ProducerStateManager,
- leaderEpochCache: 
Optional[LeaderEpochFileCache]): Int = {
-  recoveredSegments += this
-  super.recover(producerStateManager, leaderEpochCache)
-}
-  }
+  val wrapper = Mockito.spy(segment)

Review Comment:
   Using a `spy` mock instead of a copy constructor is not better. What is the 
purpose of this change? cc @chia7712 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: change inter.broker.protocol version to inter.broker.protocol.version [kafka]

2024-03-09 Thread via GitHub


chia7712 merged PR #15504:
URL: https://github.com/apache/kafka/pull/15504


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit [kafka]

2024-03-09 Thread via GitHub


chia7712 merged PR #15289:
URL: https://github.com/apache/kafka/pull/15289


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove the copy constructor of LogSegment [kafka]

2024-03-09 Thread via GitHub


chia7712 merged PR #15488:
URL: https://github.com/apache/kafka/pull/15488


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove the copy constructor of LogSegment [kafka]

2024-03-09 Thread via GitHub


chia7712 commented on PR #15488:
URL: https://github.com/apache/kafka/pull/15488#issuecomment-1986950699

   the failed tests pass on my local.
   ```sh
   ./gradlew cleanTest core:test --tests QuorumControllerTest --tests 
ReplicaManagerTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16356) Remove class-name dispatch in RemoteLogMetadataSerde

2024-03-09 Thread Linu Shibu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824971#comment-17824971
 ] 

Linu Shibu edited comment on KAFKA-16356 at 3/9/24 5:30 PM:


Can I assign this to myself and work on it [~gharris1727] ?


was (Author: JIRAUSER304485):
Can I assign this to myself and work on it?

> Remove class-name dispatch in RemoteLogMetadataSerde
> 
>
> Key: KAFKA-16356
> URL: https://issues.apache.org/jira/browse/KAFKA-16356
> Project: Kafka
>  Issue Type: Task
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Priority: Trivial
>  Labels: newbie
>
> The RemoteLogMetadataSerde#serialize receives a RemoteLogMetadata object, and 
> has to dispatch to one of four serializers depending on it's type. This is 
> done by taking the class name of the RemoteLogMetadata and looking it up in 
> maps to find the corresponding serializer for that class.
> This later requires an unchecked cast, because the RemoteLogMetadataTransform 
> is generic. This is all type-unsafe, and can be replaced with type-safe 
> if-elseif-else statements that may also be faster than the double-indirect 
> map lookups.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16356) Remove class-name dispatch in RemoteLogMetadataSerde

2024-03-09 Thread Linu Shibu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824971#comment-17824971
 ] 

Linu Shibu commented on KAFKA-16356:


Can I assign this to myself and work on it?

> Remove class-name dispatch in RemoteLogMetadataSerde
> 
>
> Key: KAFKA-16356
> URL: https://issues.apache.org/jira/browse/KAFKA-16356
> Project: Kafka
>  Issue Type: Task
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Priority: Trivial
>  Labels: newbie
>
> The RemoteLogMetadataSerde#serialize receives a RemoteLogMetadata object, and 
> has to dispatch to one of four serializers depending on it's type. This is 
> done by taking the class name of the RemoteLogMetadata and looking it up in 
> maps to find the corresponding serializer for that class.
> This later requires an unchecked cast, because the RemoteLogMetadataTransform 
> is generic. This is all type-unsafe, and can be replaced with type-safe 
> if-elseif-else statements that may also be faster than the double-indirect 
> map lookups.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14683) Replace EasyMock and PowerMock with Mockito in WorkerSinkTaskTest

2024-03-09 Thread Hector Geraldino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino resolved KAFKA-14683.
--
  Reviewer: Greg Harris
Resolution: Fixed

> Replace EasyMock and PowerMock with Mockito in WorkerSinkTaskTest
> -
>
> Key: KAFKA-14683
> URL: https://issues.apache.org/jira/browse/KAFKA-14683
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest

2024-03-09 Thread Hector Geraldino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino reopened KAFKA-16223:
--

> Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
> ---
>
> Key: KAFKA-16223
> URL: https://issues.apache.org/jira/browse/KAFKA-16223
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest

2024-03-09 Thread Hector Geraldino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino resolved KAFKA-16223.
--
Fix Version/s: 3.8.0
 Reviewer: Greg Harris
   Resolution: Fixed

> Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
> ---
>
> Key: KAFKA-16223
> URL: https://issues.apache.org/jira/browse/KAFKA-16223
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Cleanup WorkerSinkTaskTest [kafka]

2024-03-09 Thread via GitHub


hgeraldino opened a new pull request, #15506:
URL: https://github.com/apache/kafka/pull/15506

   Follow up of https://github.com/apache/kafka/pull/15316
   
   * Rename `WorkerSinkTaskMockitoTest` back to `WorkerSinkTaskTest`
   * Tidy up the code a bit
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]

2024-03-09 Thread via GitHub


FrankYang0529 opened a new pull request, #15505:
URL: https://github.com/apache/kafka/pull/15505

   There were different words for `metadata.version` like `metadata version` or 
`metadataVersion`.
   Unify format as `metadata.version`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: change inter.broker.protocol version to inter.broker.protocol.version [kafka]

2024-03-09 Thread via GitHub


FrankYang0529 opened a new pull request, #15504:
URL: https://github.com/apache/kafka/pull/15504

   It looks like there is no `inter.broker.protocol` config.
   
   Change the name to `inter.broker.protocol.version`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-03-09 Thread via GitHub


soarez commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1518545295


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -355,10 +355,11 @@ class LogManager(logDirs: Seq[File],
 } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
   addStrayLog(topicPartition, log)
   warn(s"Loaded stray log: $logDir")
-} else if (shouldBeStrayKraftLog(log)) {
-  // Mark the partition directories we're not supposed to have as stray. 
We have to do this
-  // during log load because topics may have been recreated with the same 
name while a disk
-  // was offline.
+} else if (isStray(log.topicId, topicPartition)) {
+  // Opposite of Zookeeper mode deleted topic in KRAFT mode can be 
recreated while it's not fully deleted from broker.
+  // As a result of this broker in KRAFT mode with one offline directory 
has no way to detect to-be-deleted replica in an offline directory earlier.
+  // However, broker need to mark the partition directories as stray 
during log load because topics may have been
+  // recreated with the same name while a log directory was offline.

Review Comment:
   
   
   ```suggestion
 // Unlike Zookeeper mode, which tracks pending topic deletions under a 
ZNode, KRaft is unable to prevent a topic from being recreated before every 
replica has been deleted.
 // A KRaft broker with an offline directory may be unable to detect it 
still holds a to-be-deleted replica, and can create a conflicting topic 
partition for a new incarnation of the topic in one of the remaining online 
directories.
 // So upon a restart in which the offline directory is back online we 
need to clean up the old replica directory.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix documentation for RETRIES_DOC on version 3.2 [kafka]

2024-03-09 Thread via GitHub


kamalcph commented on PR #15413:
URL: https://github.com/apache/kafka/pull/15413#issuecomment-1986805186

   > Yeah, totally; I think the important thing is correcting the 
documentation, as people may be confused while reading it. Still, I don't know 
where the documentation is. Is it a separate repo? I thought that was generated 
from code
   
   Yes, the documentation are maintained in a separate repo: 
https://github.com/apache/kafka-site


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org