Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440153281


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   @kamalcph Yes sure. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


kamalcph commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440146317


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   @iit2009060  can we remove the `public` method access specifier?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1874937679

   > LGTM, thanks for the patch!
   > 
   > This patch handles the FETCH request for previously compacted topic 
segments uploaded to remote storage. We also have to go through the
   > 
   > 1. `upload`, `deletion` path  and
   > 2. RemoteLogMetadataCache internal state
   > 
   > for both normal and unclean-leader-election scenarios.
   https://issues.apache.org/jira/browse/KAFKA-15388 
   As mentioned in the description for upload and delete it will not be  
impacted. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440133601


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440126550


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2065,6 +2066,11 @@ public Optional 
fetchRemoteLogSegmentMetadata(TopicPar
 return Optional.of(segmentMetadata);
 }
 
+public Optional 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
+  
Option leaderEpochFileCacheOption) {
+return Optional.ofNullable(null);

Review Comment:
   done.



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2161,6 +2167,85 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
 }
 }
 
+@Test
+public void testReadForFirstBatchInLogCompaction() throws 
RemoteStorageException, IOException {
+FileInputStream fileInputStream = mock(FileInputStream.class);
+RemoteLogInputStream remoteLogInputStream = 
mock(RemoteLogInputStream.class);
+ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440116425


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   @kamalcph  Do we really see a need for passing actual ByteArrayInputStream, 
as it will make it difficult to mock the behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440115636


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   @kamalcph  I want to mock the RemoteLoginputStream so that i can invoke and 
mockthe nextBatch function accordingly. In the first iteration I want the 
firstBatch to be null  and in the next iteration firstBatch to be available.
   `   when(remoteLogInputStream.nextBatch()).thenReturn(null, firstBatch);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440115636


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   @kamalcph  I want to mock the RemoteLoginputStream so that i can invoke the 
nextBatch function. In the first iteration I want the firstBatch to be null  
and in the next iteration firstBatch to be available.
   `   when(remoteLogInputStream.nextBatch()).thenReturn(null, firstBatch);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-02 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802024#comment-17802024
 ] 

Satish Duggana commented on KAFKA-16073:


We discussed one possible solution is to address it by updating 
local-log-start-offset before the segments are removed from inmemory and 
scheduled for deletion but we need to think through the end to end scenarios. 
cc [~Kamal C]

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


kamalcph commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440097317


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   Add one comment above the `while` loop to mention the case that we handled 
for posterity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


kamalcph commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1440089289


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1340,6 +1346,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
 }
 }
+// for testing
+public RemoteLogInputStream getRemoteLogInputStream(InputStream in) {

Review Comment:
   can we reduce the method access specifier to package-private? I would 
suggest to remove this method and return the actual stream instead of 
mockStream for `remoteStorageManager.fetchLogSegment`:
   
   ```java
   new ByteArrayInputStream(records(ts, startOffset, 
targetLeaderEpoch).buffer().array())
   ```



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2161,6 +2167,85 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
 }
 }
 
+@Test
+public void testReadForFirstBatchInLogCompaction() throws 
RemoteStorageException, IOException {
+FileInputStream fileInputStream = mock(FileInputStream.class);
+RemoteLogInputStream remoteLogInputStream = 
mock(RemoteLogInputStream.class);
+ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))

Review Comment:
   There are two remote-storage-manager: `rsmManager` (local) and 
`remoteStorageManager` (global). Can we discard the latter one?



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2065,6 +2066,11 @@ public Optional 
fetchRemoteLogSegmentMetadata(TopicPar
 return Optional.of(segmentMetadata);
 }
 
+public Optional 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
+  
Option leaderEpochFileCacheOption) {
+return Optional.ofNullable(null);

Review Comment:
   Can we use `Optional.empty()` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15742: KRaft support in GroupCoordinatorIntegrationTest [kafka]

2024-01-02 Thread via GitHub


wernerdv commented on code in PR #15086:
URL: https://github.com/apache/kafka/pull/15086#discussion_r1440092493


##
core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala:
##
@@ -33,18 +34,21 @@ class GroupCoordinatorIntegrationTest extends 
KafkaServerTestHarness {
   overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, 
offsetsTopicCompressionCodec.id.toString)
 
-  override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, 
enableControlledShutdown = false).map {
+  override def generateConfigs = TestUtils.createBrokerConfigs(1, 
zkConnectOrNull, enableControlledShutdown = false).map {
 KafkaConfig.fromProps(_, overridingProps)
   }
 
-  @Test
-  def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum: 
String): Unit = {
 val consumer = TestUtils.createConsumer(bootstrapServers())
 val offsetMap = Map(
   new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new 
OffsetAndMetadata(10, "")
 ).asJava
 consumer.commitSync(offsetMap)
-val logManager = servers.head.getLogManager
+val logManager =
+  if (isKRaftTest()) brokers.head.logManager

Review Comment:
   yep, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16070) Extract the setReadOnly method into Headers

2024-01-02 Thread johndoe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

johndoe reassigned KAFKA-16070:
---

Assignee: johndoe

> Extract the setReadOnly method into Headers
> ---
>
> Key: KAFKA-16070
> URL: https://issues.apache.org/jira/browse/KAFKA-16070
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Lighting Sui
>Assignee: johndoe
>Priority: Major
>
> Abstract the setReadOnly function into the Headers Interface and provide a 
> default implementation.
> The setReadOnly function in Headers can be rewritten by RecordHeaders, so 
> that the setReadOnly function in KafkaProducer can be removed and the code 
> can be cleaned up.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-01-02 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran reassigned KAFKA-15561:
--

Assignee: Phuc Hong Tran

> Client support for new SubscriptionPattern based subscription 
> --
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2024-01-02 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran reassigned KAFKA-15682:
--

Assignee: Phuc Hong Tran

> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Phuc Hong Tran
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on the assumption and need an assertion to ensure that the internal 
> {{__remote_log_metadata}} segments are not eligible for deletion before the 
> expiry of all the relevant user-topic uploaded remote-log-segments , 
> otherwise there will be dangling remote-log-segments which won't be cleared 
> once all the brokers are restarted post the internal topic retention cleanup.
> See the discussion thread: 
> https://github.com/apache/kafka/pull/14576#discussion_r1368576126



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id

2024-01-02 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran resolved KAFKA-16034.

Resolution: Fixed

> AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on 
> fenced/unknown member Id
> ---
>
> Key: KAFKA-16034
> URL: https://issues.apache.org/jira/browse/KAFKA-16034
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
> Fix For: 3.8.0
>
>
> The consumer will log invalid request error when joining from fenced/unknown 
> member id because we didn't reset the HeartbeatState and we won't send the 
> needed fields (rebalanceTimeoutMs for example) when joining.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id

2024-01-02 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802013#comment-17802013
 ] 

Phuc Hong Tran commented on KAFKA-16034:


[~pnee] PlainTextConsumerTest isn't failing with any fenced/unknown_memeber_id 
exception. We can close this one then.

> AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on 
> fenced/unknown member Id
> ---
>
> Key: KAFKA-16034
> URL: https://issues.apache.org/jira/browse/KAFKA-16034
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
> Fix For: 3.8.0
>
>
> The consumer will log invalid request error when joining from fenced/unknown 
> member id because we didn't reset the HeartbeatState and we won't send the 
> needed fields (rebalanceTimeoutMs for example) when joining.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-02 Thread via GitHub


ashwinpankaj commented on code in PR #15101:
URL: https://github.com/apache/kafka/pull/15101#discussion_r1440041522


##
core/src/test/java/kafka/test/junit/LeakTestingExtension.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.utils.TestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class LeakTestingExtension implements AfterEachCallback {
+@Override
+public void afterEach(ExtensionContext extensionContext) {
+TestUtils.verifyNoUnexpectedThreads("@AfterEach");

Review Comment:
   Thanks @wernerdv . I had a basic question about this change - 
   
[TestUtils.verifyNoUnexpectedThreads](https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2432)
 seems to check for the presence of specific type of threads. Will this check 
be sufficient to catch all types of thread leaks ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-02 Thread via GitHub


ashwinpankaj commented on code in PR #15101:
URL: https://github.com/apache/kafka/pull/15101#discussion_r1440041522


##
core/src/test/java/kafka/test/junit/LeakTestingExtension.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.utils.TestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class LeakTestingExtension implements AfterEachCallback {
+@Override
+public void afterEach(ExtensionContext extensionContext) {
+TestUtils.verifyNoUnexpectedThreads("@AfterEach");

Review Comment:
   Thanks @wernerdv . I had a basic question about this change - 
   
(TestUtils.verifyNoUnexpectedThreads)[https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2432]
 seems to check for the presence of specific type of threads. Will this check 
be sufficient to catch all types of thread leaks ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] remove meaningless lines [kafka]

2024-01-02 Thread via GitHub


github-actions[bot] commented on PR #14423:
URL: https://github.com/apache/kafka/pull/14423#issuecomment-1874810052

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-02 Thread hzh0425 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802000#comment-17802000
 ] 

hzh0425 commented on KAFKA-16073:
-

ping [~satish.duggana], When you are free, please help to see if there are any 
other solutions

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14510) Extend DescribeConfigs API to support group configs

2024-01-02 Thread Jimmy Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jimmy Wang reassigned KAFKA-14510:
--

Assignee: Jimmy Wang

> Extend DescribeConfigs API to support group configs
> ---
>
> Key: KAFKA-14510
> URL: https://issues.apache.org/jira/browse/KAFKA-14510
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Jimmy Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix flaky test RemoteIndexCacheTest.testClose() [kafka]

2024-01-02 Thread via GitHub


jolshan commented on PR #15108:
URL: https://github.com/apache/kafka/pull/15108#issuecomment-1874744763

   Do we know why sometimes we get interrupted? I think this is a useful change 
though, when I zoomed out for a month the failures were more frequent. 
   
   
https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=America%2FLos_Angeles=kafka.log.remote.RemoteIndexCacheTest=testClose()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15742: KRaft support in GroupCoordinatorIntegrationTest [kafka]

2024-01-02 Thread via GitHub


jolshan commented on code in PR #15086:
URL: https://github.com/apache/kafka/pull/15086#discussion_r1439990671


##
core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala:
##
@@ -33,18 +34,21 @@ class GroupCoordinatorIntegrationTest extends 
KafkaServerTestHarness {
   overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, 
offsetsTopicCompressionCodec.id.toString)
 
-  override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, 
enableControlledShutdown = false).map {
+  override def generateConfigs = TestUtils.createBrokerConfigs(1, 
zkConnectOrNull, enableControlledShutdown = false).map {
 KafkaConfig.fromProps(_, overridingProps)
   }
 
-  @Test
-  def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum: 
String): Unit = {
 val consumer = TestUtils.createConsumer(bootstrapServers())
 val offsetMap = Map(
   new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new 
OffsetAndMetadata(10, "")
 ).asJava
 consumer.commitSync(offsetMap)
-val logManager = servers.head.getLogManager
+val logManager =
+  if (isKRaftTest()) brokers.head.logManager

Review Comment:
   Can we use brokers.head.logManager for Zk tests too? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15742: KRaft support in GroupCoordinatorIntegrationTest [kafka]

2024-01-02 Thread via GitHub


jolshan commented on PR #15086:
URL: https://github.com/apache/kafka/pull/15086#issuecomment-1874741304

   
`testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(String).quorum=kraft
 PASSED`
   
   Looks like it is passing on all builds.
   
   拾 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Increase parallelism for Jenkins [kafka]

2024-01-02 Thread via GitHub


jolshan commented on PR #15099:
URL: https://github.com/apache/kafka/pull/15099#issuecomment-1874738065

   @divijvaidya did you accidentally include the KafkaApisTest change here? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]

2024-01-02 Thread via GitHub


jolshan commented on PR #14599:
URL: https://github.com/apache/kafka/pull/14599#issuecomment-1874737282

   Thanks @MikeEdgar looks like the test is fixed. I'll run the build again to 
make sure there aren't any other related tests failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16046: also fix stores for outer join [kafka]

2024-01-02 Thread via GitHub


ableegoldman merged PR #15073:
URL: https://github.com/apache/kafka/pull/15073


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16046: also fix stores for outer join [kafka]

2024-01-02 Thread via GitHub


ableegoldman commented on PR #15073:
URL: https://github.com/apache/kafka/pull/15073#issuecomment-1874668715

   Test failures are unrelated, merging to trunk and will cherrypick to 3.7 cc 
@stanislavkozlovski 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16076: Interrupting the currentThread fix [kafka]

2024-01-02 Thread via GitHub


metao1 opened a new pull request, #15110:
URL: https://github.com/apache/kafka/pull/15110

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16076) RestClient Interrupting the thread in case of InterruptedException

2024-01-02 Thread Mehrdad Karami (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mehrdad Karami updated KAFKA-16076:
---
Description: 
In RestClient class, httpRequest is being called with different threads. An 
InterruptedException in case of failure is used to handle its specific 
exceptions, nevertheless it's forgot to call Thread.currentThread().interrupt().

In general, it's a good practice to call this so the rest of code know the 
thread was interrupted already.
Note:
Some methods that cause a thread to wait or sleep (like {{{}Thread.sleep(){}}}) 
will check this flag. If they see it’s set, they’ll stop waiting/sleeping and 
throw an {{InterruptedException.}}

  was:
In RestClient class, `httpRequest` is being called with different threads. An 
`InterruptedException` in case of failure is used to handle its specific 
exceptions, nevertheless it's forgot to call 
`Thread.currentThread().interrupt()`.

In general, it's a good practice to call this so the rest of code know the 
thread was interrupted already.
Note:
Some methods that cause a thread to wait or sleep (like 
`{{{}Thread.sleep()`{}}}) will check this flag. If they see it’s set, they’ll 
stop waiting/sleeping and throw an `{{{}InterruptedException`{}}}


> RestClient Interrupting the thread in case of InterruptedException
> --
>
> Key: KAFKA-16076
> URL: https://issues.apache.org/jira/browse/KAFKA-16076
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Mehrdad Karami
>Priority: Minor
>  Labels: easyfix
>
> In RestClient class, httpRequest is being called with different threads. An 
> InterruptedException in case of failure is used to handle its specific 
> exceptions, nevertheless it's forgot to call 
> Thread.currentThread().interrupt().
> In general, it's a good practice to call this so the rest of code know the 
> thread was interrupted already.
> Note:
> Some methods that cause a thread to wait or sleep (like 
> {{{}Thread.sleep(){}}}) will check this flag. If they see it’s set, they’ll 
> stop waiting/sleeping and throw an {{InterruptedException.}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16076) RestClient Interrupting the thread in case of InterruptedException

2024-01-02 Thread Mehrdad Karami (Jira)
Mehrdad Karami created KAFKA-16076:
--

 Summary: RestClient Interrupting the thread in case of 
InterruptedException
 Key: KAFKA-16076
 URL: https://issues.apache.org/jira/browse/KAFKA-16076
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Mehrdad Karami


In RestClient class, `httpRequest` is being called with different threads. An 
`InterruptedException` in case of failure is used to handle its specific 
exceptions, nevertheless it's forgot to call 
`Thread.currentThread().interrupt()`.

In general, it's a good practice to call this so the rest of code know the 
thread was interrupted already.
Note:
Some methods that cause a thread to wait or sleep (like 
`{{{}Thread.sleep()`{}}}) will check this flag. If they see it’s set, they’ll 
stop waiting/sleeping and throw an `{{{}InterruptedException`{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup race on unassigned task directories [kafka]

2024-01-02 Thread via GitHub


ableegoldman commented on code in PR #15088:
URL: https://github.com/apache/kafka/pull/15088#discussion_r1439872897


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1229,10 +1229,21 @@ private void tryToLockAllNonEmptyTaskDirectories() {
 final String namedTopology = taskDir.namedTopology();
 try {
 final TaskId id = parseTaskDirectoryName(dir.getName(), 
namedTopology);
-if (stateDirectory.lock(id)) {
-lockedTaskDirectories.add(id);
-if (!allTasks.containsKey(id)) {
-log.debug("Temporarily locked unassigned task {} for 
the upcoming rebalance", id);
+boolean lockedEmptyDirectory = false;
+try {
+if (stateDirectory.lock(id)) {
+if (stateDirectory.directoryForTaskIsEmpty(id)) {
+lockedEmptyDirectory = true;

Review Comment:
   This is maybe a bit pedantic, but I feel like it would be easier to 
understand the flow & intent of this code if we just unlocked it directly right 
here (instead of introducing the `lockedEmptyDirectory` flag and then unlocking 
it later on)
   
   Also, since we log a debug message when we do lock the directory, we might 
as well put a similar debug message in this branch too. Something like 
`"Released lock on empty directory for task {}"`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2024-01-02 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801927#comment-17801927
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16025:


Nice find and writeup of the race condition – I'll take a look at the fix

> Streams StateDirectory has orphaned locks after rebalancing, blocking future 
> rebalancing
> 
>
> Key: KAFKA-16025
> URL: https://issues.apache.org/jira/browse/KAFKA-16025
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
> Environment: Linux
>Reporter: Sabit
>Priority: Major
>
> Hello,
>  
> We are encountering an issue where during rebalancing, we see streams threads 
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that 
> some tasks were having issues initializing due to failure to grab a lock in 
> the StateDirectory:
>  
> {{2023-12-14 22:51:57.352000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] 
> Failed to lock the state directory for task 0_51; will retry}}
>  
> We were able to reproduce this behavior reliably on 3.4.0. This is the 
> sequence that triggers the bug.
> Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), 
> each with 5 threads (1-5), and the consumer is using stateful tasks which 
> have state stores on disk. There are 10 active tasks and 10 standby tasks.
>  # Instance A is deactivated
>  # As an example, lets say task 0_1, previously on instance B, moves to 
> instance C
>  # Task 0_1 leaves behind it's state directory on Instance B's disk, 
> currently unused, and no lock for it exists in Instance B's StateDirectory 
> in-memory lock tracker
>  # Instance A is re-activated
>  # Streams thread 1 on Instance B is asked to re-join the consumer group due 
> to a new member being added
>  # As part of re-joining, thread 1 lists non-empty state directories in order 
> to report the offset's it has in it's state stores as part of it's metadata. 
> Thread 1 sees that the directory for 0_1 is not empty.
>  # The cleanup thread on instance B runs. The cleanup thread locks state 
> store 0_1, sees the directory for 0_1 was last modified more than 
> `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
>  # Thread 1 takes a lock on directory 0_1 due to it being found not-empty 
> before, unaware that the cleanup has run between the time of the check and 
> the lock. It tracks this lock in it's own in-memory store, in addition to 
> StateDirectory's in-memory lock store
>  # Thread 1 successfully joins the consumer group
>  # After every consumer in the group joins the group, assignments are 
> calculated, and then every consumer calls sync group to receive the new 
> assignments
>  # Thread 1 on Instance B calls sync group but gets an error - the group 
> coordinator has triggered a new rebalance and all members must rejoin the 
> group
>  # Thread 1 again lists non-empty state directories in order to report the 
> offset's it has in it's state stores as part of it's metadata. Prior to doing 
> so, it clears it's in-memory store tracking the locks it has taken for the 
> purpose of gathering rebalance metadata
>  # Thread 1 no longer takes a lock on 0_1 as it is empty
>  # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
>  # All consumers re-join and sync successfully, receiving their new 
> assignments
>  # Thread 2 on Instance B is assigned task 0_1
>  # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is 
> still being held by Thread 1
>  # Thread 2 remains in rebalancing state, and cannot make progress on task 
> 0_1, or any other tasks it has assigned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Prototype: Measure CI slowness [kafka]

2024-01-02 Thread via GitHub


gharris1727 closed pull request #15008: MINOR: Prototype: Measure CI slowness
URL: https://github.com/apache/kafka/pull/15008


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-01-02 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801924#comment-17801924
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16055:


 See this discussion on the user mailing list for additional context: 
[https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-01-02 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16055:
---
Labels: newbie newbie++  (was: )

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Priority: Minor
>  Labels: newbie, newbie++
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16075) TLS configuration not validated in KRaft controller-only nodes

2024-01-02 Thread Jakub Scholz (Jira)
Jakub Scholz created KAFKA-16075:


 Summary: TLS configuration not validated in KRaft controller-only 
nodes
 Key: KAFKA-16075
 URL: https://issues.apache.org/jira/browse/KAFKA-16075
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.6.1
Reporter: Jakub Scholz


When the Kafka broker node (either a broker in ZooKeeper based cluster or node 
with a broker role in a KRaft cluster) has an incorrect TLS configuration such 
as unsupported TLS cipher suite, it seems to throw a {{ConfigException}} and 
shutdown:
{code:java}
2024-01-02 13:50:24,895 ERROR Exiting Kafka due to fatal exception during 
startup. (kafka.Kafka$) [main]
org.apache.kafka.common.config.ConfigException: Invalid value 
java.lang.IllegalArgumentException: Unsupported CipherSuite: 
TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305 for configuration A client SSLEngine 
created with the provided settings can't connect to a server SSLEngine created 
with those settings.
at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:102)
at 
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:73)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
at kafka.network.Processor.(SocketServer.scala:973)
at kafka.network.Acceptor.newProcessor(SocketServer.scala:879)
at 
kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:849)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at kafka.network.Acceptor.addProcessors(SocketServer.scala:848)
at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:523)
at 
kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:251)
at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)
at 
kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
at kafka.network.SocketServer.(SocketServer.scala:175)
at kafka.server.BrokerServer.startup(BrokerServer.scala:242)
at 
kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:96)
at 
kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:96)
at scala.Option.foreach(Option.scala:437)
at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:96)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala) {code}
But in a KRaft controller-only nodes, such validation does not seem to happen 
and the broker keeps running and looping with this warning:
{code:java}
2024-01-02 13:53:10,186 WARN [RaftManager id=1] Error connecting to node 
my-cluster-controllers-0.my-cluster-kafka-brokers.myproject.svc.cluster.local:9090
 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient) 
[kafka-1-raft-outbound-request-thread]
java.io.IOException: Channel could not be created for socket 
java.nio.channels.SocketChannel[closed]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:348)
at 
org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1032)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301)
at 
org.apache.kafka.server.util.InterBrokerSendThread.sendRequests(InterBrokerSendThread.java:145)
at 
org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:108)
at 
org.apache.kafka.server.util.InterBrokerSendThread.doWork(InterBrokerSendThread.java:136)
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
Caused by: org.apache.kafka.common.KafkaException: 
java.lang.IllegalArgumentException: Unsupported CipherSuite: 
TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305
at 
org.apache.kafka.common.network.SslChannelBuilder.buildChannel(SslChannelBuilder.java:111)
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
... 8 more
Caused by: java.lang.IllegalArgumentException: Unsupported CipherSuite: 
TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305
at 
java.base/sun.security.ssl.CipherSuite.validValuesOf(CipherSuite.java:978)
at 
java.base/sun.security.ssl.SSLEngineImpl.setEnabledCipherSuites(SSLEngineImpl.java:864)
at 

[jira] [Commented] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id

2024-01-02 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801921#comment-17801921
 ] 

Philip Nee commented on KAFKA-16034:


Hi [~phuctran] - I think the issue might already been fixed.  Please try to 
reproduce it using PlainTextConsumerTest.

> AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on 
> fenced/unknown member Id
> ---
>
> Key: KAFKA-16034
> URL: https://issues.apache.org/jira/browse/KAFKA-16034
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
> Fix For: 3.8.0
>
>
> The consumer will log invalid request error when joining from fenced/unknown 
> member id because we didn't reset the HeartbeatState and we won't send the 
> needed fields (rebalanceTimeoutMs for example) when joining.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]

2024-01-02 Thread via GitHub


philipnee commented on code in PR #15020:
URL: https://github.com/apache/kafka/pull/15020#discussion_r1439651419


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##
@@ -70,8 +70,10 @@ protected void maybeThrowAuthFailure(Node node) {
  */
 @Override
 public PollResult poll(long currentTimeMs) {
+boolean checkNodeAvailability = false;
+
 return pollInternal(
-prepareFetchRequests(),
+prepareFetchRequests(checkNodeAvailability),

Review Comment:
   just pass false.  the var above is useless.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -403,7 +405,7 @@ protected Map 
prepareCloseFetchSessi
  * Create fetch requests for all nodes for which we have assigned 
partitions
  * that have no existing requests in flight.
  */
-protected Map 
prepareFetchRequests() {
+protected Map 
prepareFetchRequests(boolean checkNodeAvailability) {

Review Comment:
   `final boolean`



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -372,7 +372,7 @@ Node selectReadReplica(final TopicPartition partition, 
final Node leaderReplica,
 }
 }
 
-protected Map 
prepareCloseFetchSessionRequests() {
+protected Map 
prepareCloseFetchSessionRequests(boolean checkNodeAvailability) {

Review Comment:
   `final boolean checkNodeAvailability`
   



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -428,37 +430,53 @@ protected Map 
prepareFetchRequests()
 // Use the preferred read replica if set, otherwise the 
partition's leader
 Node node = selectReadReplica(partition, leaderOpt.get(), 
currentTimeMs);
 
-if (isUnavailable(node)) {
-maybeThrowAuthFailure(node);
-
-// If we try to send during the reconnect backoff window, then 
the request is just
-// going to be failed anyway before being sent, so skip 
sending the request for now
-log.trace("Skipping fetch for partition {} because node {} is 
awaiting reconnect backoff", partition, node);
-} else if (nodesWithPendingFetchRequests.contains(node.id())) {
-log.trace("Skipping fetch for partition {} because previous 
request to {} has not been processed", partition, node);
+if (checkNodeAvailability) {

Review Comment:
   is it possible to avoid nested if? 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -102,7 +102,9 @@ public void 
clearBufferedDataForUnassignedPartitions(Collection
  * @return number of fetches sent
  */
 public synchronized int sendFetches() {
-final Map fetchRequests = 
prepareFetchRequests();
+boolean checkNodeAvailability = true;

Review Comment:
   same as above



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -385,7 +385,9 @@ protected Map 
prepareCloseFetchSessi
 // skip sending the close request.
 final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
 
-if (fetchTarget == null || isUnavailable(fetchTarget)) {
+boolean fetchTargetAvailability = checkNodeAvailability ? 
(fetchTarget == null || isUnavailable(fetchTarget)) : fetchTarget == null;

Review Comment:
   `final boolean`
   
   - i would also call this `isFetchTargetAvailable`
   - let's not use inline if.  this makes the code harder to read.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##
@@ -82,9 +84,14 @@ public PollResult poll(long currentTimeMs) {
  */
 @Override
 public PollResult pollOnClose() {
+
+boolean checkNodeAvailability = false;
+
+
 // TODO: move the logic to poll to handle signal close
+
 return pollInternal(
-prepareCloseFetchSessionRequests(),
+prepareCloseFetchSessionRequests(checkNodeAvailability),

Review Comment:
   ditto



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -781,10 +781,10 @@ public void testFetchSkipsBlackedOutNodes() {
 Node node = initialUpdateResponse.brokers().iterator().next();
 
 client.backoff(node, 500);
-assertEquals(0, sendFetches());
+assertEquals(1, sendFetches());

Review Comment:
   why are you changing the tests? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]

2024-01-02 Thread via GitHub


ijuma commented on PR #15109:
URL: https://github.com/apache/kafka/pull/15109#issuecomment-1874508632

   I guess one way would be to enable the cache only for CI & trunk by default 
at first. Then we could manually enable the cache locally and for one PR to 
test the behavior. If it all looks good, we would then enable the cache for 
everyone (but leave pushes for trunk && CI only).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]

2024-01-02 Thread via GitHub


ijuma commented on PR #15109:
URL: https://github.com/apache/kafka/pull/15109#issuecomment-1874507122

   Could we do a test with a new branch (not trunk) where we validate the 
behavior before we roll it out to everyone else?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]

2024-01-02 Thread via GitHub


nicktelford commented on PR #15109:
URL: https://github.com/apache/kafka/pull/15109#issuecomment-1874500874

   Hi @divijvaidya , this enables the _remote_ build cache, which allows 
different machines to share the cached output of tasks.
   
   Thanks for the links to the builds, I haven't had a chance to look at them 
yet, but the thing that requires access to ge would be too simulate a trunk CI 
run and push to the cache, which I won't be able to do locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16063: Prevent memory leak by Disabling shutdownhook [kafka]

2024-01-02 Thread via GitHub


divijvaidya merged PR #15104:
URL: https://github.com/apache/kafka/pull/15104


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16063: Prevent memory leak by Disabling shutdownhook [kafka]

2024-01-02 Thread via GitHub


divijvaidya commented on PR #15104:
URL: https://github.com/apache/kafka/pull/15104#issuecomment-1874493514

   The test failures not in classes that use MiniKdc which is changed in this 
PR. The only test that uses kdc and fails is 
`SaslScramSslEndToEndAuthorizationTest` but it is successful in earlier runs of 
CI. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]

2024-01-02 Thread via GitHub


divijvaidya commented on PR #15109:
URL: https://github.com/apache/kafka/pull/15109#issuecomment-1874483500

   Hey @nicktelford , you don't need committer permissions to look at ASF 
gradle. Try using this link: https://ge.apache.org, it should be publicly 
accessible. 
   
   For this PR, here's the build - https://ge.apache.org/s/bdwopqcoxwge2 which 
shows `Build Cache` as `On` which is unlike this build on trunk 
https://ge.apache.org/s/ywmt7vaghr62u where `Build Cache` is `Off`.
   
   I am not sure how useful this will be since CI for trunk may build on 
different boxes but it's definitely worth keeping it on.
   
   @ijuma is our gradle expert in the community. I will wait to hear his 
thoughts regarding this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16051) Deadlock on connector initialization

2024-01-02 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801909#comment-17801909
 ] 

Greg Harris commented on KAFKA-16051:
-

I added permissions for you on Jira, and assigned the ticket to you.

It is expected that you don't have permission to add reviewers directly on 
github. In the future, you can at-mention reviewers, both other contributors 
and committers and ask for review, without using the "ask for review" feature 
on github which requires special permissions.

> Deadlock on connector initialization
> 
>
> Key: KAFKA-16051
> URL: https://issues.apache.org/jira/browse/KAFKA-16051
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.6.3, 3.6.1
>Reporter: Octavian Ciubotaru
>Assignee: Octavian Ciubotaru
>Priority: Major
>
>  
> Tested with Kafka 3.6.1 and 2.6.3.
> The only plugin installed is confluentinc-kafka-connect-jdbc-10.7.4.
> Stack trace for Kafka 3.6.1:
> {noformat}
> Found one Java-level deadlock:
> =
> "pool-3-thread-1":
>   waiting to lock monitor 0x7fbc88006300 (object 0x91002aa0, a 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder),
>   which is held by "Thread-9"
> "Thread-9":
>   waiting to lock monitor 0x7fbc88008800 (object 0x9101ccd8, a 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore),
>   which is held by "pool-3-thread-1"Java stack information for the threads 
> listed above:
> ===
> "pool-3-thread-1":
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder$ConfigUpdateListener.onTaskConfigUpdate(StandaloneHerder.java:516)
>     - waiting to lock <0x91002aa0> (a 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder)
>     at 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore.putTaskConfigs(MemoryConfigBackingStore.java:137)
>     - locked <0x9101ccd8> (a 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:483)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$null$2(StandaloneHerder.java:229)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder$$Lambda$692/0x000840557440.run(Unknown
>  Source)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.21/Executors.java:515)
>     at 
> java.util.concurrent.FutureTask.run(java.base@11.0.21/FutureTask.java:264)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@11.0.21/ScheduledThreadPoolExecutor.java:304)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.21/ThreadPoolExecutor.java:1128)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.21/ThreadPoolExecutor.java:628)
>     at java.lang.Thread.run(java.base@11.0.21/Thread.java:829)
> "Thread-9":
>     at 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore.putTaskConfigs(MemoryConfigBackingStore.java:129)
>     - waiting to lock <0x9101ccd8> (a 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:483)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.requestTaskReconfiguration(StandaloneHerder.java:255)
>     - locked <0x91002aa0> (a 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder)
>     at 
> org.apache.kafka.connect.runtime.HerderConnectorContext.requestTaskReconfiguration(HerderConnectorContext.java:50)
>     at 
> org.apache.kafka.connect.runtime.WorkerConnector$WorkerConnectorContext.requestTaskReconfiguration(WorkerConnector.java:548)
>     at 
> io.confluent.connect.jdbc.source.TableMonitorThread.run(TableMonitorThread.java:86)
> Found 1 deadlock.
> {noformat}
> The jdbc source connector is loading tables from the database and updates the 
> configuration once the list is available. The deadlock is very consistent in 
> my environment, probably because the database is on the same machine.
> Maybe it is possible to avoid this situation by always locking the herder 
> first and the config backing store second. From what I see, 
> updateConnectorTasks sometimes is called before locking on herder and other 
> times it is not.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16051) Deadlock on connector initialization

2024-01-02 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-16051:
---

Assignee: Octavian Ciubotaru

> Deadlock on connector initialization
> 
>
> Key: KAFKA-16051
> URL: https://issues.apache.org/jira/browse/KAFKA-16051
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.6.3, 3.6.1
>Reporter: Octavian Ciubotaru
>Assignee: Octavian Ciubotaru
>Priority: Major
>
>  
> Tested with Kafka 3.6.1 and 2.6.3.
> The only plugin installed is confluentinc-kafka-connect-jdbc-10.7.4.
> Stack trace for Kafka 3.6.1:
> {noformat}
> Found one Java-level deadlock:
> =
> "pool-3-thread-1":
>   waiting to lock monitor 0x7fbc88006300 (object 0x91002aa0, a 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder),
>   which is held by "Thread-9"
> "Thread-9":
>   waiting to lock monitor 0x7fbc88008800 (object 0x9101ccd8, a 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore),
>   which is held by "pool-3-thread-1"Java stack information for the threads 
> listed above:
> ===
> "pool-3-thread-1":
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder$ConfigUpdateListener.onTaskConfigUpdate(StandaloneHerder.java:516)
>     - waiting to lock <0x91002aa0> (a 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder)
>     at 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore.putTaskConfigs(MemoryConfigBackingStore.java:137)
>     - locked <0x9101ccd8> (a 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:483)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$null$2(StandaloneHerder.java:229)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder$$Lambda$692/0x000840557440.run(Unknown
>  Source)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.21/Executors.java:515)
>     at 
> java.util.concurrent.FutureTask.run(java.base@11.0.21/FutureTask.java:264)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@11.0.21/ScheduledThreadPoolExecutor.java:304)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.21/ThreadPoolExecutor.java:1128)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.21/ThreadPoolExecutor.java:628)
>     at java.lang.Thread.run(java.base@11.0.21/Thread.java:829)
> "Thread-9":
>     at 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore.putTaskConfigs(MemoryConfigBackingStore.java:129)
>     - waiting to lock <0x9101ccd8> (a 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:483)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.requestTaskReconfiguration(StandaloneHerder.java:255)
>     - locked <0x91002aa0> (a 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder)
>     at 
> org.apache.kafka.connect.runtime.HerderConnectorContext.requestTaskReconfiguration(HerderConnectorContext.java:50)
>     at 
> org.apache.kafka.connect.runtime.WorkerConnector$WorkerConnectorContext.requestTaskReconfiguration(WorkerConnector.java:548)
>     at 
> io.confluent.connect.jdbc.source.TableMonitorThread.run(TableMonitorThread.java:86)
> Found 1 deadlock.
> {noformat}
> The jdbc source connector is loading tables from the database and updates the 
> configuration once the list is available. The deadlock is very consistent in 
> my environment, probably because the database is on the same machine.
> Maybe it is possible to avoid this situation by always locking the herder 
> first and the config backing store second. From what I see, 
> updateConnectorTasks sometimes is called before locking on herder and other 
> times it is not.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-02 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1439665912


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dirFile + 
Paths.get("/../siblingdir/siblingdirFile"));

Review Comment:
   This is still an invalid path traversal attack. The duplicate test in 
DirectoryConfigProviderTest appears to be fine.
   
   Have you looked into deduplicating these two classes? That would also avoid 
requiring two different tests for the same traversal attack.



##
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##
@@ -153,5 +159,57 @@ public void testServiceLoaderDiscovery() {
 ServiceLoader serviceLoader = 
ServiceLoader.load(ConfigProvider.class);
 assertTrue(StreamSupport.stream(serviceLoader.spliterator(), 
false).anyMatch(configProvider -> configProvider instanceof 
DirectoryConfigProvider));
 }
+
+@Test
+public void testAllowedPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, parent.getAbsolutePath());
+provider.configure(configs);
+
+ConfigData configData = provider.get(dir);
+assertEquals(toSet(asList(foo, bar)), configData.data().keySet());
+assertEquals("FOO", configData.data().get(foo));
+assertEquals("BAR", configData.data().get(bar));
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+provider.configure(configs);
+
+ConfigData configData = provider.get(subdir);
+assertEquals(toSet(asList(subdirFileName)), 

[jira] [Commented] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2024-01-02 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801904#comment-17801904
 ] 

Almog Gavra commented on KAFKA-16046:
-

https://github.com/apache/kafka/pull/15073

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
> Fix For: 3.7.0
>
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2024-01-02 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra reopened KAFKA-16046:
-

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
> Fix For: 3.7.0
>
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15215: docs for KIP-954 [kafka]

2024-01-02 Thread via GitHub


ableegoldman commented on PR #14949:
URL: https://github.com/apache/kafka/pull/14949#issuecomment-1874410202

   Merged to trunk and cherrypicked to 3.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: docs for KIP-954 [kafka]

2024-01-02 Thread via GitHub


ableegoldman merged PR #14949:
URL: https://github.com/apache/kafka/pull/14949


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: docs for KIP-954 [kafka]

2024-01-02 Thread via GitHub


ableegoldman commented on PR #14949:
URL: https://github.com/apache/kafka/pull/14949#issuecomment-1874406419

   FYI @agavra try to always fill out a PR description, even if it's super 
short/simple, so we have something for the commit message. I'll just make it up 
for this case but going forward it's on you  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: docs for KIP-954 [kafka]

2024-01-02 Thread via GitHub


ableegoldman commented on PR #14949:
URL: https://github.com/apache/kafka/pull/14949#issuecomment-1874405243

   Ok this has dragged on for long enough, I'm just going to merge this even 
though we haven't gotten a single individual run without any build failures, 
because (a) we have at least one yellow build without compiler errors for each 
JDK if you look across the last three runs, and (b) this is only touching docs 
anyway so the failures are pretty clearly unrelated
   
   cc @stanislavkozlovski going to merge this to 3.7 since it's just feature 
docs for 3.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-02 Thread via GitHub


mimaison commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1439616708


##
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##
@@ -43,36 +49,36 @@ public class DirectoryConfigProviderTest {
 
 private DirectoryConfigProvider provider;
 private File parent;
-private File dir;
-private File bar;
-private File foo;
-private File subdir;
-private File subdirFile;
-private File siblingDir;
-private File siblingDirFile;
-private File siblingFile;
-
-private static File writeFile(File file) throws IOException {
-Files.write(file.toPath(), 
file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
-return file;
+private String dir;
+private final String bar = "bar";
+private final String foo = "foo";
+private String subdir;
+private final String subdirFileName = "subdirFile";
+private String siblingDir;
+private final String siblingDirFileName = "siblingDirFile";
+private final String siblingFileName = "siblingFile";
+
+private static Path writeFile(Path path) throws IOException {
+return Files.write(path, 
String.valueOf(path.getFileName()).toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
 }
 
 @BeforeEach
 public void setup() throws IOException {
 provider = new DirectoryConfigProvider();
 provider.configure(Collections.emptyMap());
 parent = TestUtils.tempDirectory();
-dir = new File(parent, "dir");
-dir.mkdir();
-foo = writeFile(new File(dir, "foo"));
-bar = writeFile(new File(dir, "bar"));
-subdir = new File(dir, "subdir");
-subdir.mkdir();
-subdirFile = writeFile(new File(subdir, "subdirFile"));
-siblingDir = new File(parent, "siblingdir");
-siblingDir.mkdir();
-siblingDirFile = writeFile(new File(siblingDir, "siblingdirFile"));
-siblingFile = writeFile(new File(parent, "siblingFile"));
+
+dir = 
String.valueOf(Files.createDirectory(Paths.get(parent.getAbsolutePath(), 
"dir")));

Review Comment:
   I think we can just use `.toString()` here as `Files.createDirectory()` 
should never return `null`.



##
clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java:
##
@@ -44,8 +47,23 @@ public class DirectoryConfigProvider implements 
ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(DirectoryConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";

Review Comment:
   `Path that this config` -> `A comma separated list of paths that this config`



##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";
+private List allowedPaths;
+
 public void configure(Map configs) {
+if (configs.containsKey(ALLOWED_PATHS_CONFIG)) {
+String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG);
+
+if (configValue != null && !configValue.isEmpty()) {

Review Comment:
   Should we throw or log something if this field is set to a bad value?



##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";
+private List allowedPaths;
+
 public void configure(Map configs) {
+if (configs.containsKey(ALLOWED_PATHS_CONFIG)) {
+String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG);
+
+if (configValue != null && !configValue.isEmpty()) {
+allowedPaths = new ArrayList<>();
+Arrays.stream(configValue.split(",")).forEach(b -> 
allowedPaths.add(Paths.get(b).normalize()));
+}
+} else {
+allowedPaths = null;

Review Comment:
   Could we initialize the field when it's declared and remove this `else` 
block?



##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {

Re: [PR] KAFKA-15816: Fix leaked sockets in trogdor tests [kafka]

2024-01-02 Thread via GitHub


gharris1727 commented on PR #14771:
URL: https://github.com/apache/kafka/pull/14771#issuecomment-1874318692

   @divijvaidya Thanks for the review. I applied your suggestion. PTAL, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]

2024-01-02 Thread via GitHub


jolshan commented on PR #15093:
URL: https://github.com/apache/kafka/pull/15093#issuecomment-1874304438

   Thanks folks for fixing this stuff :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets

2024-01-02 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15816:

Description: 
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
 * NioEchoServer
 * KafkaConsumerTest
 * KafkaProducerTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest
 * SaslAuthenticatorTest

Core: [https://github.com/apache/kafka/pull/14754] 
 * MiniKdc
 * GssapiAuthenticationTest
 * MirrorMakerIntegrationTest
 * SocketServerTest
 * EpochDrivenReplicationProtocolAcceptanceTest
 * LeaderEpochIntegrationTest

Trogdor: [https://github.com/apache/kafka/pull/14771] 
 * AgentTest

Mirror: [https://github.com/apache/kafka/pull/14761] (DONE)
 * DedicatedMirrorIntegrationTest
 * MirrorConnectorsIntegrationTest
 * MirrorConnectorsWithCustomForwardingAdminIntegrationTest

Runtime: [https://github.com/apache/kafka/pull/14764] (DONE)
 * ConnectorTopicsIntegrationTest
 * ExactlyOnceSourceIntegrationTest
 * WorkerTest
 * WorkerGroupMemberTest

Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
 * IQv2IntegrationTest
 * MetricsReporterIntegrationTest
 * NamedTopologyIntegrationTest
 * PurgeRepartitionTopicIntegrationTest

These can be addressed by just fixing the tests.

  was:
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
 * NioEchoServer
 * KafkaConsumerTest
 * KafkaProducerTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest
 * SaslAuthenticatorTest

Core: [https://github.com/apache/kafka/pull/14754] 
 * MiniKdc
 * GssapiAuthenticationTest
 * MirrorMakerIntegrationTest
 * SocketServerTest
 * EpochDrivenReplicationProtocolAcceptanceTest
 * LeaderEpochIntegrationTest

Trogdor: [https://github.com/apache/kafka/pull/14771] 
 * AgentTest

Mirror: [https://github.com/apache/kafka/pull/14761] (DONE)
 * DedicatedMirrorIntegrationTest
 * MirrorConnectorsIntegrationTest
 * MirrorConnectorsWithCustomForwardingAdminIntegrationTest

Runtime: [https://github.com/apache/kafka/pull/14764] 
 * ConnectorTopicsIntegrationTest
 * ExactlyOnceSourceIntegrationTest
 * WorkerTest
 * WorkerGroupMemberTest

Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
 * IQv2IntegrationTest
 * MetricsReporterIntegrationTest
 * NamedTopologyIntegrationTest
 * PurgeRepartitionTopicIntegrationTest

These can be addressed by just fixing the tests.


> Typos in tests leak network sockets
> ---
>
> Key: KAFKA-15816
> URL: https://issues.apache.org/jira/browse/KAFKA-15816
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> There are a few tests which leak network sockets due to small typos in the 
> tests themselves.
> Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
>  * NioEchoServer
>  * KafkaConsumerTest
>  * KafkaProducerTest
>  * SelectorTest
>  * SslTransportLayerTest
>  * SslTransportTls12Tls13Test
>  * SslVersionsTransportLayerTest
>  * SaslAuthenticatorTest
> Core: [https://github.com/apache/kafka/pull/14754] 
>  * MiniKdc
>  * GssapiAuthenticationTest
>  * MirrorMakerIntegrationTest
>  * SocketServerTest
>  * EpochDrivenReplicationProtocolAcceptanceTest
>  * LeaderEpochIntegrationTest
> Trogdor: [https://github.com/apache/kafka/pull/14771] 
>  * AgentTest
> Mirror: [https://github.com/apache/kafka/pull/14761] (DONE)
>  * DedicatedMirrorIntegrationTest
>  * MirrorConnectorsIntegrationTest
>  * MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> Runtime: [https://github.com/apache/kafka/pull/14764] (DONE)
>  * ConnectorTopicsIntegrationTest
>  * ExactlyOnceSourceIntegrationTest
>  * WorkerTest
>  * WorkerGroupMemberTest
> Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
>  * IQv2IntegrationTest
>  * MetricsReporterIntegrationTest
>  * NamedTopologyIntegrationTest
>  * PurgeRepartitionTopicIntegrationTest
> These can be addressed by just fixing the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16046: also fix stores for outer join [kafka]

2024-01-02 Thread via GitHub


lucasbru commented on PR #15073:
URL: https://github.com/apache/kafka/pull/15073#issuecomment-1874248887

   @stanislavkozlovski @mjsax This is a blocker issue for 3.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix: Prevent java.lang.UnsupportedOperationException. [kafka]

2024-01-02 Thread via GitHub


vamossagar12 commented on PR #14955:
URL: https://github.com/apache/kafka/pull/14955#issuecomment-1874218869

   @divijvaidya , do you mind taking a look at this one? Once the checkstyles 
are fixed, It looks good to go from my side (will approve once the changes are 
made).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bugfix: Prevent java.lang.UnsupportedOperationException. [kafka]

2024-01-02 Thread via GitHub


vamossagar12 commented on code in PR #14955:
URL: https://github.com/apache/kafka/pull/14955#discussion_r1439583795


##
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##
@@ -1014,7 +1014,9 @@ synchronized public DescribeLogDirsResult 
describeLogDirs(Collection br
 for (Node node : nodes) {
 Map logDirDescriptionMap = 
unwrappedResults.get(node.id());
 LogDirDescription logDirDescription = 
logDirDescriptionMap.getOrDefault(partitionLogDirs.get(0), new 
LogDirDescription(null, new HashMap<>()));
-logDirDescription.replicaInfos().put(new 
TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 
0, false));
+Map 
topicPartitionReplicaInfoMap = new HashMap<>(logDirDescription.replicaInfos());
+topicPartitionReplicaInfoMap.put(new 
TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 
0, false));
+logDirDescriptionMap.put(partitionLogDirs.get(0), new 
LogDirDescription(logDirDescription.error(), topicPartitionReplicaInfoMap));

Review Comment:
   Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9545: Fix IllegalStateException in updateLags [kafka]

2024-01-02 Thread via GitHub


lucasbru merged PR #15096:
URL: https://github.com/apache/kafka/pull/15096


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: avoid unnecessary UnsupportedOperationException [kafka]

2024-01-02 Thread via GitHub


mjsax merged PR #15102:
URL: https://github.com/apache/kafka/pull/15102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16059) Fix leak of ExpirationReaper-1-AlterAcls threads in :core:test

2024-01-02 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16059:
-
Fix Version/s: 3.7.0
   3.8.0

> Fix leak of ExpirationReaper-1-AlterAcls threads in :core:test
> --
>
> Key: KAFKA-16059
> URL: https://issues.apache.org/jira/browse/KAFKA-16059
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
> Fix For: 3.7.0, 3.8.0
>
> Attachments: Screenshot 2023-12-29 at 11.13.01.png
>
>
> We are leaking hundreds of ExpirationReaper-1-AlterAcls threads in one of the 
> tests in :core:test
> {code:java}
> "ExpirationReaper-1-AlterAcls" prio=0 tid=0x0 nid=0x0 waiting on condition
>      java.lang.Thread.State: TIMED_WAITING
>  on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3688fc67
>     at java.base@17.0.9/jdk.internal.misc.Unsafe.park(Native Method)
>     at 
> java.base@17.0.9/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:252)
>     at 
> java.base@17.0.9/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1672)
>     at 
> java.base@17.0.9/java.util.concurrent.DelayQueue.poll(DelayQueue.java:265)
>     at 
> app//org.apache.kafka.server.util.timer.SystemTimer.advanceClock(SystemTimer.java:87)
>     at 
> app//kafka.server.DelayedOperationPurgatory.advanceClock(DelayedOperation.scala:418)
>     at 
> app//kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:444)
>     at 
> app//org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
>  {code}
> The objective of this Jira is to identify the test and fix this leak



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer

2024-01-02 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801817#comment-17801817
 ] 

Kamal Chandraprakash commented on KAFKA-15777:
--

[~isding_l] 

This task require a KIP as we may have to add a new config to the 
consumer/broker.

> Configurable remote fetch bytes per partition from Consumer
> ---
>
> Key: KAFKA-15777
> URL: https://issues.apache.org/jira/browse/KAFKA-15777
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> A consumer can configure the amount of local bytes to read from each 
> partition in the FETCH request.
> {{max.fetch.bytes}} = 50 MB
> {{max.partition.fetch.bytes}} = 1 MB
> Similar to this, the consumer should be able to configure 
> {{max.remote.partition.fetch.bytes}} = 4 MB.
> While handling the {{FETCH}} request, if we encounter a partition to read 
> data from remote storage, then rest of the partitions in the request are 
> ignored. Essentially, we are serving only 1 MB of remote data per FETCH 
> request when all the partitions in the request are to be served from the 
> remote storage.
> Providing one more configuration to the client help the user to tune the 
> values depending on their storage plugin. The user might want to optimise the 
> number of calls to remote storage vs amount of bytes returned back to the 
> client in the FETCH response.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer

2024-01-02 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15777:
-
Labels: kip  (was: )

> Configurable remote fetch bytes per partition from Consumer
> ---
>
> Key: KAFKA-15777
> URL: https://issues.apache.org/jira/browse/KAFKA-15777
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip
>
> A consumer can configure the amount of local bytes to read from each 
> partition in the FETCH request.
> {{max.fetch.bytes}} = 50 MB
> {{max.partition.fetch.bytes}} = 1 MB
> Similar to this, the consumer should be able to configure 
> {{max.remote.partition.fetch.bytes}} = 4 MB.
> While handling the {{FETCH}} request, if we encounter a partition to read 
> data from remote storage, then rest of the partitions in the request are 
> ignored. Essentially, we are serving only 1 MB of remote data per FETCH 
> request when all the partitions in the request are to be served from the 
> remote storage.
> Providing one more configuration to the client help the user to tune the 
> values depending on their storage plugin. The user might want to optimise the 
> number of calls to remote storage vs amount of bytes returned back to the 
> client in the FETCH response.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Enable Gradle Remote Build Cache [kafka]

2024-01-02 Thread via GitHub


nicktelford commented on PR #15109:
URL: https://github.com/apache/kafka/pull/15109#issuecomment-1874134447

   I don't have access to the ASF Gradle Enterprise, so a committer will need 
to verify that this works as intended.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Enable Gradle Remote Build Cache [kafka]

2024-01-02 Thread via GitHub


nicktelford opened a new pull request, #15109:
URL: https://github.com/apache/kafka/pull/15109

   We enable the remote build cache, hosted by the ASF Gradle Enterprise 
instance.
   
   We only cache tasks during builds on `trunk`, to ensure that pushes to PRs 
always re-run all Tasks needed by that PR.
   
   PRs will read from the build cache, enabling tasks for modules that haven't 
been changed in the PR to be read from the cache.
   
   This should ensure that changes "leaf" modules (e.g. connect, streams, etc.) 
don't run the entire test suite from dependent modules (i.e. core, clients, 
storage, etc.), which can be very costly.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]

2024-01-02 Thread via GitHub


divijvaidya merged PR #15093:
URL: https://github.com/apache/kafka/pull/15093


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]

2024-01-02 Thread via GitHub


divijvaidya commented on PR #15093:
URL: https://github.com/apache/kafka/pull/15093#issuecomment-1874131838

   The test changed in this PR is successful - 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15093/3/testReport/kafka.server/ControllerApisTest/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id

2024-01-02 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801811#comment-17801811
 ] 

Phuc Hong Tran commented on KAFKA-16034:


[~pnee], I don't think reset HeartbeatState is enough as it would set 
rebalanceTimeoutMs to -1, which is make the request invalid as the member epoch 
was set to 0 during fenced/unknown_memeber_id exception. We need to reassign 
the rebalanceTimeoutMs to sentFields.

> AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on 
> fenced/unknown member Id
> ---
>
> Key: KAFKA-16034
> URL: https://issues.apache.org/jira/browse/KAFKA-16034
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
> Fix For: 3.8.0
>
>
> The consumer will log invalid request error when joining from fenced/unknown 
> member id because we didn't reset the HeartbeatState and we won't send the 
> needed fields (rebalanceTimeoutMs for example) when joining.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Bugfix: Prevent java.lang.UnsupportedOperationException. [kafka]

2024-01-02 Thread via GitHub


jamespfaulkner commented on code in PR #14955:
URL: https://github.com/apache/kafka/pull/14955#discussion_r1439519451


##
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##
@@ -1014,7 +1014,9 @@ synchronized public DescribeLogDirsResult 
describeLogDirs(Collection br
 for (Node node : nodes) {
 Map logDirDescriptionMap = 
unwrappedResults.get(node.id());
 LogDirDescription logDirDescription = 
logDirDescriptionMap.getOrDefault(partitionLogDirs.get(0), new 
LogDirDescription(null, new HashMap<>()));
-logDirDescription.replicaInfos().put(new 
TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 
0, false));
+Map 
topicPartitionReplicaInfoMap = new HashMap<>(logDirDescription.replicaInfos());
+topicPartitionReplicaInfoMap.put(new 
TopicPartition(topicName, topicPartitionInfo.partition()), new ReplicaInfo(0, 
0, false));
+logDirDescriptionMap.put(partitionLogDirs.get(0), new 
LogDirDescription(logDirDescription.error(), topicPartitionReplicaInfoMap));

Review Comment:
   Good point, thanks for the feedback. I've made this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1874100254

   @clolov Can we merge the request ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15777) Configurable remote fetch bytes per partition from Consumer

2024-01-02 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801803#comment-17801803
 ] 

Lan Ding commented on KAFKA-15777:
--

hi [~ckamal],  could I pick this up?

> Configurable remote fetch bytes per partition from Consumer
> ---
>
> Key: KAFKA-15777
> URL: https://issues.apache.org/jira/browse/KAFKA-15777
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> A consumer can configure the amount of local bytes to read from each 
> partition in the FETCH request.
> {{max.fetch.bytes}} = 50 MB
> {{max.partition.fetch.bytes}} = 1 MB
> Similar to this, the consumer should be able to configure 
> {{max.remote.partition.fetch.bytes}} = 4 MB.
> While handling the {{FETCH}} request, if we encounter a partition to read 
> data from remote storage, then rest of the partitions in the request are 
> ignored. Essentially, we are serving only 1 MB of remote data per FETCH 
> request when all the partitions in the request are to be served from the 
> remote storage.
> Providing one more configuration to the client help the user to tune the 
> values depending on their storage plugin. The user might want to optimise the 
> number of calls to remote storage vs amount of bytes returned back to the 
> client in the FETCH response.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1454]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Fix flaky test RemoteIndexCacheTest.testClose() [kafka]

2024-01-02 Thread via GitHub


DL1231 opened a new pull request, #15108:
URL: https://github.com/apache/kafka/pull/15108

   Test fails 2% of the time.
   
   
https://ge.apache.org/scans/tests?search.timeZoneId=Europe/Berlin=kafka.log.remote.RemoteIndexCacheTest=testClose()
   
   This test should be modified to test 
   assertTrue(cache.cleanerThread.isShutdownComplete) in a 
TestUtils.waitUntilTrue condition which will catch the InterruptedException and 
exit successfully on it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-02 Thread via GitHub


clolov commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1439460528


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
 InputStream remoteSegInputStream = null;
 try {
-// Search forward for the position of the last offset that is 
greater than or equal to the target offset
-remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+int startPos = 0;
+RecordBatch firstBatch = null;
+while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   Apologies for the delay, okay, this reasoning makes sense to me, but even if 
I am wrong the while loop won't really cause a performance impact, so I am 
happy with it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito [kafka]

2024-01-02 Thread via GitHub


clolov commented on code in PR #15106:
URL: https://github.com/apache/kafka/pull/15106#discussion_r1439447074


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -642,11 +645,12 @@ public void 
shouldRestoreFromBeginningAndCheckCompletion() {
 
 @Test
 public void shouldCheckCompletionIfPositionLargerThanEndOffset() {
+setupActiveStateManager();

Review Comment:
   Yeah, the reason for not doing it in @before is that since not all tests 
exercise the methods in the stubbing Mockito complains that there are unused 
stubs. This way we only stub in the tests where the code is exercised!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15206) Flaky test RemoteIndexCacheTest.testClose()

2024-01-02 Thread Lan Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lan Ding reassigned KAFKA-15206:


Assignee: Lan Ding

> Flaky test RemoteIndexCacheTest.testClose()
> ---
>
> Key: KAFKA-15206
> URL: https://issues.apache.org/jira/browse/KAFKA-15206
> Project: Kafka
>  Issue Type: Test
>Reporter: Divij Vaidya
>Assignee: Lan Ding
>Priority: Minor
>  Labels: flaky-test
> Fix For: 3.8.0
>
>
> Test fails 2% of the time.
> [https://ge.apache.org/scans/tests?search.timeZoneId=Europe/Berlin=kafka.log.remote.RemoteIndexCacheTest=testClose()]
>  
> This test should be modified to test 
> assertTrue(cache.cleanerThread.isShutdownComplete) in a 
> TestUtils.waitUntilTrue condition which will catch the InterruptedException 
> and exit successfully on it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Improve code style about producer [kafka]

2024-01-02 Thread via GitHub


DL1231 opened a new pull request, #15107:
URL: https://github.com/apache/kafka/pull/15107

   I was reading about producer and found a couple of code style 
inconsistencies. This PR fixes them.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito [kafka]

2024-01-02 Thread via GitHub


lucasbru commented on code in PR #15106:
URL: https://github.com/apache/kafka/pull/15106#discussion_r1439431364


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -642,11 +645,12 @@ public void 
shouldRestoreFromBeginningAndCheckCompletion() {
 
 @Test
 public void shouldCheckCompletionIfPositionLargerThanEndOffset() {
+setupActiveStateManager();

Review Comment:
   I suppose the point of not doing this in @Before is to avoid 
unnecessary-mocking errors?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14133: Migrate activeStateManager and standbyStateManager mocks in StoreChangelogReaderTest to Mockito [kafka]

2024-01-02 Thread via GitHub


clolov opened a new pull request, #15106:
URL: https://github.com/apache/kafka/pull/15106

   This pull request takes a similar approach to how TaskManagerTest is being 
migrated to Mockito mock by mock for easier reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate stateManager mock in StoreChangelogReaderTest to Mockito [kafka]

2024-01-02 Thread via GitHub


divijvaidya merged PR #14929:
URL: https://github.com/apache/kafka/pull/14929


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate stateManager mock in StoreChangelogReaderTest to Mockito [kafka]

2024-01-02 Thread via GitHub


divijvaidya commented on PR #14929:
URL: https://github.com/apache/kafka/pull/14929#issuecomment-1873974414

   There are flaky tests in CI but the test changed in this PR is successful: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14929/2/testReport/org.apache.kafka.streams.processor.internals/StoreChangelogReaderTest/
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-02 Thread via GitHub


showuon commented on PR #15101:
URL: https://github.com/apache/kafka/pull/15101#issuecomment-1873956404

   @wernerdv , thanks for the smart way to detect the thread leaking!
   For now, only `core` module will introduce thousands of test errors. We need 
to fix all the thread leaking before extending to other modules. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-02 Thread via GitHub


wernerdv commented on PR #15101:
URL: https://github.com/apache/kafka/pull/15101#issuecomment-1873949674

   @divijvaidya Thanks for the reply. Now extension runs only for modules that:
   `testImplementation project(':core')`
   
   Is this expected behavior or does it require improvement?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]

2024-01-02 Thread via GitHub


nicktelford commented on PR #15105:
URL: https://github.com/apache/kafka/pull/15105#issuecomment-1873934211

   @cadonna @mjsax @ableegoldman @lucasbru @wcarlson5 @bbejeck @vvcephei 
@guozhangwang 
   
   This is part of KIP-892, and has been broken out into a separate PR to 
reduce the review burden on the main KIP-892 implementation, since it can be 
merged independently.
   
   There are no tests, because there are no behavioural changes, just a 
refactoring. The existing test suite should ensure no regressions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14412: Decouple RocksDB access from CF [kafka]

2024-01-02 Thread via GitHub


nicktelford opened a new pull request, #15105:
URL: https://github.com/apache/kafka/pull/15105

   To support future use-cases that use different strategies for accessing 
RocksDB, we need to de-couple the RocksDB access strategy from the Column 
Family access strategy.
   
   To do this, we now have two separate accessors:
   
 * `DBAccessor`: dictates how we access RocksDB. Currently only one 
strategy is supported: `DirectDBAccessor`, which access RocksDB directly, via 
the `RocksDB` class for all operations. In the future, a `BatchedDBAccessor` 
will be added, which enables transactions via `WriteBatch`.
 * `ColumnFamilyAccessor`: maps StateStore operations to operations on one 
or more column families. This is a rename of the old `RocksDBDBAccessor`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-02 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1439373207


##
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##
@@ -83,27 +89,27 @@ public void close() throws IOException {
 
 @Test
 public void testGetAllKeysAtPath() {
-ConfigData configData = provider.get(dir.getAbsolutePath());

Review Comment:
   This was resulted by using `java.nio.file.Files` instead of `File` therefore 
no longer can call getAbsolutePath() to pass in a string argument. The test 
variables types are changed to String, to avoid having to do string 
conversation everywhere. I didn't think changing test variable types would 
break the compatibility, since the we are still passing the same type of 
variable to the calling method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-02 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1439364385


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dir + 
"../siblingdir/siblingdirFile");

Review Comment:
   good catch! I have fixed these now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16074) Fix thread leaks in ReplicaManagerTest

2024-01-02 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801744#comment-17801744
 ] 

Divij Vaidya commented on KAFKA-16074:
--

https://github.com/apache/kafka/pull/15077

> Fix thread leaks in ReplicaManagerTest
> --
>
> Key: KAFKA-16074
> URL: https://issues.apache.org/jira/browse/KAFKA-16074
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Following [@dajac|https://github.com/dajac] 's finding in 
> [#15063|https://github.com/apache/kafka/pull/15063], I found we also create 
> new RemoteLogManager in ReplicaManagerTest, but didn't close them.
> While investigating ReplicaManagerTest, I also found there are other threads 
> leaking:
>  # remote fetch reaper thread. It's because we create a reaper thread in 
> test, which is not expected. We should create a mocked one like other 
> purgatory instance.
>  # Throttle threads. We created a {{quotaManager}} to feed into the 
> replicaManager, but didn't close it. Actually, we have created a global 
> {{quotaManager}} instance and will close it on {{{}AfterEach{}}}. We should 
> re-use it.
>  # replicaManager and logManager didn't invoke {{close}} after test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16074) Fix thread leaks in ReplicaManagerTest

2024-01-02 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-16074:


 Summary: Fix thread leaks in ReplicaManagerTest
 Key: KAFKA-16074
 URL: https://issues.apache.org/jira/browse/KAFKA-16074
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen
Assignee: Luke Chen


Following [@dajac|https://github.com/dajac] 's finding in 
[#15063|https://github.com/apache/kafka/pull/15063], I found we also create new 
RemoteLogManager in ReplicaManagerTest, but didn't close them.

While investigating ReplicaManagerTest, I also found there are other threads 
leaking:
 # remote fetch reaper thread. It's because we create a reaper thread in test, 
which is not expected. We should create a mocked one like other purgatory 
instance.
 # Throttle threads. We created a {{quotaManager}} to feed into the 
replicaManager, but didn't close it. Actually, we have created a global 
{{quotaManager}} instance and will close it on {{{}AfterEach{}}}. We should 
re-use it.
 # replicaManager and logManager didn't invoke {{close}} after test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16072) Create Junit 5 extension to detect thread leak

2024-01-02 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16072:
-
Fix Version/s: 3.8.0

> Create Junit 5 extension to detect thread leak
> --
>
> Key: KAFKA-16072
> URL: https://issues.apache.org/jira/browse/KAFKA-16072
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Divij Vaidya
>Assignee: Dmitry Werner
>Priority: Major
>  Labels: newbie++
> Fix For: 3.8.0
>
>
> The objective of this task is to create a Junit extension that will execute 
> after every test and verify that there are no lingering threads left over. 
> An example of how to create an extension can be found here:
> [https://github.com/apache/kafka/pull/14783/files#diff-812cfc2780b6fc0e7a1648ff37912ff13aeda4189ea6b0d4d847b831f66e56d1]
> An example on how to find unexpected threads is at 
> [https://github.com/apache/kafka/blob/d5aa341a185f4df23bf587e55bcda4f16fc511f1/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2427]
>  and also at 
> https://issues.apache.org/jira/browse/KAFKA-16052?focusedCommentId=17800978=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17800978
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-02 Thread via GitHub


divijvaidya commented on PR #15101:
URL: https://github.com/apache/kafka/pull/15101#issuecomment-1873898677

   Thank you for the change @wernerdv . As a next step, we need to fix the 
leaks that were detected by this PR before merging this. I already know of at 
least two PRs that will help: https://github.com/apache/kafka/pull/15093 and 
https://github.com/apache/kafka/pull/15077 . Let's wait for them to merge first 
and then we can rebase this on trunk and run again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]

2024-01-02 Thread via GitHub


divijvaidya commented on PR #15093:
URL: https://github.com/apache/kafka/pull/15093#issuecomment-1873868215

   Thank you for the review @showuon. I believe I have fixed all indentation 
problems now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Upgrade Zstd-jni to 1.5.5-11 [kafka]

2024-01-02 Thread via GitHub


divijvaidya merged PR #14798:
URL: https://github.com/apache/kafka/pull/14798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9545: Fix IllegalStateException in updateLags [kafka]

2024-01-02 Thread via GitHub


lucasbru commented on PR #15096:
URL: https://github.com/apache/kafka/pull/15096#issuecomment-1873828040

   
   > I do not completely understand how the consumer subscription can reflect 
the topic deletion while Streams has still references to the deleted topic. The 
update to the subscription of the consumer happens right after the call to 
`onAssignment()` [1]. Both calls should happen inside the call to `poll()`. 
What do I miss?
   
   In every iteration of the poll loop of the `ConsumerCoordinator` we update 
the subscription in `maybeUpdateSubscriptionMetadata`, based on the latest 
metadata known to the consumer - which will reflect the topic deletion and 
update the subscription.
   
   The subscription is used to update the assignment in `onJoinPrepare` and 
`onLeavePrepare`. 
   
   
https://github.com/apache/kafka/blob/2b99d0e45027c88ae2347fa8f7d1ff4b2b919089/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L801
 
   
   
https://github.com/apache/kafka/blob/2b99d0e45027c88ae2347fa8f7d1ff4b2b919089/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L857
   
   The partitions in the `RecordCollector` will only be updated when we have a 
new assignment, so when in state `PREPARE_SHUTDOWN` or `PARTITIONS_REVOKED` the 
`RecordCollector` may have `TopicPartition`s that we do not technically own 
anymore.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]

2024-01-02 Thread via GitHub


showuon commented on code in PR #15093:
URL: https://github.com/apache/kafka/pull/15093#discussion_r1439274094


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2055,8 +2058,8 @@ class KafkaApisTest {
 responseCallback.capture(),
 ArgumentMatchers.eq(requestLocal)
   )).thenAnswer(_ => 
responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, 
Errors.PRODUCER_FENCED)))
-
-  createKafkaApis().handleInitProducerIdRequest(request, requestLocal)
+kafkaApis = createKafkaApis()
+kafkaApis.handleInitProducerIdRequest(request, requestLocal)

Review Comment:
   Is the indent correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]

2024-01-02 Thread via GitHub


showuon commented on code in PR #15093:
URL: https://github.com/apache/kafka/pull/15093#discussion_r1439274094


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2055,8 +2058,8 @@ class KafkaApisTest {
 responseCallback.capture(),
 ArgumentMatchers.eq(requestLocal)
   )).thenAnswer(_ => 
responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, 
Errors.PRODUCER_FENCED)))
-
-  createKafkaApis().handleInitProducerIdRequest(request, requestLocal)
+kafkaApis = createKafkaApis()
+kafkaApis.handleInitProducerIdRequest(request, requestLocal)

Review Comment:
   Does the indent correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: Fix thread leak KafkaAPIsTest [kafka]

2024-01-02 Thread via GitHub


showuon commented on code in PR #15093:
URL: https://github.com/apache/kafka/pull/15093#discussion_r1439274938


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2410,8 +2413,8 @@ class KafkaApisTest {
 responseCallback.capture(),
 ArgumentMatchers.eq(requestLocal)
   )).thenAnswer(_ => 
responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-
-  createKafkaApis().handleEndTxnRequest(request, requestLocal)
+kafkaApis = createKafkaApis()
+kafkaApis.handleEndTxnRequest(request, requestLocal)

Review Comment:
   here, and some places below.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2055,8 +2058,8 @@ class KafkaApisTest {
 responseCallback.capture(),
 ArgumentMatchers.eq(requestLocal)
   )).thenAnswer(_ => 
responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, 
Errors.PRODUCER_FENCED)))
-
-  createKafkaApis().handleInitProducerIdRequest(request, requestLocal)
+kafkaApis = createKafkaApis()
+kafkaApis.handleInitProducerIdRequest(request, requestLocal)

Review Comment:
   Does the intent correct?



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2167,8 +2170,8 @@ class KafkaApisTest {
 responseCallback.capture(),
 ArgumentMatchers.eq(requestLocal)
   )).thenAnswer(_ => 
responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-
-  createKafkaApis().handleAddPartitionsToTxnRequest(request, requestLocal)
+kafkaApis = createKafkaApis()
+kafkaApis.handleAddPartitionsToTxnRequest(request, requestLocal)

Review Comment:
   ditto



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2113,8 +2116,8 @@ class KafkaApisTest {
 responseCallback.capture(),
 ArgumentMatchers.eq(requestLocal)
   )).thenAnswer(_ => 
responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
-
-  createKafkaApis().handleAddOffsetsToTxnRequest(request, requestLocal)
+kafkaApis = createKafkaApis()
+kafkaApis.handleAddOffsetsToTxnRequest(request, requestLocal)

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >