[GitHub] [kafka] divijvaidya commented on pull request #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest

2023-06-21 Thread via GitHub


divijvaidya commented on PR #13868:
URL: https://github.com/apache/kafka/pull/13868#issuecomment-1600361714

   We have a miracle! Only 1 flaky test failing which is also not related to 
this change. Merging this in.
   ```
   Build / JDK 8 and Scala 2.12 / kafka.admin.LeaderElectionCommandTest.[3] 
Type=ZK, Name=testPreferredReplicaElection, MetadataVersion=3.6-IV0, 
Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest

2023-06-21 Thread via GitHub


divijvaidya merged PR #13868:
URL: https://github.com/apache/kafka/pull/13868


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13883: MINOR: Fix typos for doc

2023-06-21 Thread via GitHub


divijvaidya merged PR #13883:
URL: https://github.com/apache/kafka/pull/13883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13883: MINOR: Fix typos for doc

2023-06-21 Thread via GitHub


divijvaidya commented on PR #13883:
URL: https://github.com/apache/kafka/pull/13883#issuecomment-1600374520

   > Besides, how do you ensure there is not some people work on the same issue 
simultaneously in jira when the issue is not assigned. Seems like I need a jira 
account to assign issue to myself?
   
   Yes, we use JIRA and you can create one using the instruction at 
https://kafka.apache.org/contributing
   
   > I'm using a tool called [typos](https://github.com/crate-ci/typos). And it 
can be integrated with GitHub Actions. But I'm sure if it's a good practice to 
intergrate with repo.
   
   Yeah, perhaps not with github actions. I am hoping checkstyle comes up with 
something similar. I honestly haven't looked into automated spell checks before.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13878: MINOR: Move RocksDBTimeOrderedKeyValueBufferTest to use Junit5

2023-06-21 Thread via GitHub


cadonna commented on code in PR #13878:
URL: https://github.com/apache/kafka/pull/13878#discussion_r1236570179


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -32,31 +32,28 @@
 import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicInteger;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class RocksDBTimeOrderedKeyValueBufferTest {
 public RocksDBTimeOrderedKeyValueBuffer buffer;
-@Mock
 public SerdeGetter serdeGetter;
 public InternalProcessorContext context;
 public StreamsMetricsImpl streamsMetrics;
-@Mock
 public Sensor sensor;
 public long offset;
 
-@Before
+@BeforeEach
 public void setUp() {
+serdeGetter = mock(SerdeGetter.class);
+sensor = mock(Sensor.class);

Review Comment:
   I could not find any usage of this mock.



##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -32,31 +32,28 @@
 import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicInteger;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.StrictStubs.class)

Review Comment:
   I tested this and I can confirm what Walker stated. 



##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -32,31 +32,28 @@
 import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicInteger;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class RocksDBTimeOrderedKeyValueBufferTest {
 public RocksDBTimeOrderedKeyValueBuffer buffer;
-@Mock

Review Comment:
   To get `@Mock` to work with JUnit 5, you need to apply 
`@ExtendWith(MockitoExtension.class)` to the test class.
   That is:
   ```java
   @ExtendWith(MockitoExtension.class)
   public class RocksDBTimeOrderedKeyValueBufferTest {
   ...
   ```  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13892: MINOR: change logger used by CoreUtils#swallow()

2023-06-21 Thread via GitHub


divijvaidya commented on PR #13892:
URL: https://github.com/apache/kafka/pull/13892#issuecomment-1600378754

   There are spotbug errors in the CI build. Please fix them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-21 Thread via GitHub


divijvaidya commented on PR #13850:
URL: https://github.com/apache/kafka/pull/13850#issuecomment-1600387314

   > @divijvaidya , sorry, which commits should I check?
   
   I merged together new changes in one commit. It's this one: 
https://github.com/apache/kafka/pull/13850/commits/8513b5f76743a97543a9533f1d9c755a0a55091b


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13187: MINOR: Log lastCaughtUpTime on ISR shrinkage

2023-06-21 Thread via GitHub


divijvaidya merged PR #13187:
URL: https://github.com/apache/kafka/pull/13187


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-21 Thread via GitHub


showuon commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1236592041


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -167,64 +235,76 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
   init()
 
   // Start cleaner thread that will clean the expired entries
-  val cleanerThread: ShutdownableThread = new 
ShutdownableThread("remote-log-index-cleaner") {
+  private[remote] var cleanerThread: ShutdownableThread = new 
ShutdownableThread(remoteLogIndexCacheCleanerThread) {
 setDaemon(true)
 
 override def doWork(): Unit = {
-  while (!closed) {
-try {
+  try {
+while (!isRemoteIndexCacheClosed.get()) {
   val entry = expiredIndexes.take()
-  info(s"Cleaning up index entry $entry")
+  debug(s"Cleaning up index entry $entry")
   entry.cleanup()
-} catch {
-  case ex: InterruptedException => info("Cleaner thread was 
interrupted", ex)
-  case ex: Exception => error("Error occurred while fetching/cleaning 
up expired entry", ex)
 }
+  } catch {
+case ex: InterruptedException =>
+  // cleaner thread should only be interrupted when cache is being 
closed, else it's an error
+  if (!isRemoteIndexCacheClosed.get()) {
+error("Cleaner thread received interruption but remote index cache 
is not closed", ex)
+throw ex
+  } else {
+debug("Cleaner thread was interrupted on cache shutdown")
+  }
+case ex: Exception => error("Error occurred while fetching/cleaning up 
expired entry", ex)
   }
 }
   }
+
   cleanerThread.start()
 
   def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry 
= {
-if(closed) throw new IllegalStateException("Instance is already closed.")
-
-def loadIndexFile[T](fileName: String,
- suffix: String,
- fetchRemoteIndex: RemoteLogSegmentMetadata => 
InputStream,
- readIndex: File => T): T = {
-  val indexFile = new File(cacheDir, fileName + suffix)
-
-  def fetchAndCreateIndex(): T = {
-val tmpIndexFile = new File(cacheDir, fileName + suffix + 
RemoteIndexCache.TmpFileSuffix)
-
-val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata)
-try {
-  Files.copy(inputStream, tmpIndexFile.toPath)
-} finally {
-  if (inputStream != null) {
-inputStream.close()
-  }
-}
+if (isRemoteIndexCacheClosed.get()) {
+  throw new IllegalStateException(s"Unable to fetch index for " +
+s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. 
Index instance is already closed.")
+}
 
-Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath, 
false)
-readIndex(indexFile)
-  }
+inReadLock(lock) {
+  val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id()
+  internalCache.get(cacheKey, (uuid: Uuid) => {

Review Comment:
   Should we use `writeLock` here since the `get` means 
   > If the specified key is not already associated with a value, attempts to 
compute its value  using the given mapping function and enters it into this 
cache unless {@code null}. The entire method invocation is performed 
atomically, so the function is applied at most once per key. Some attempted 
update operations on this cache by other threads may be blocked while the 
computation is in progress, so the computation should be short and simple, and 
must not attempt
  * to update any other mappings of this cache.
   
   
https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Cache.java#L60-L65



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-21 Thread via GitHub


showuon commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1236592041


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -167,64 +235,76 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
   init()
 
   // Start cleaner thread that will clean the expired entries
-  val cleanerThread: ShutdownableThread = new 
ShutdownableThread("remote-log-index-cleaner") {
+  private[remote] var cleanerThread: ShutdownableThread = new 
ShutdownableThread(remoteLogIndexCacheCleanerThread) {
 setDaemon(true)
 
 override def doWork(): Unit = {
-  while (!closed) {
-try {
+  try {
+while (!isRemoteIndexCacheClosed.get()) {
   val entry = expiredIndexes.take()
-  info(s"Cleaning up index entry $entry")
+  debug(s"Cleaning up index entry $entry")
   entry.cleanup()
-} catch {
-  case ex: InterruptedException => info("Cleaner thread was 
interrupted", ex)
-  case ex: Exception => error("Error occurred while fetching/cleaning 
up expired entry", ex)
 }
+  } catch {
+case ex: InterruptedException =>
+  // cleaner thread should only be interrupted when cache is being 
closed, else it's an error
+  if (!isRemoteIndexCacheClosed.get()) {
+error("Cleaner thread received interruption but remote index cache 
is not closed", ex)
+throw ex
+  } else {
+debug("Cleaner thread was interrupted on cache shutdown")
+  }
+case ex: Exception => error("Error occurred while fetching/cleaning up 
expired entry", ex)
   }
 }
   }
+
   cleanerThread.start()
 
   def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry 
= {
-if(closed) throw new IllegalStateException("Instance is already closed.")
-
-def loadIndexFile[T](fileName: String,
- suffix: String,
- fetchRemoteIndex: RemoteLogSegmentMetadata => 
InputStream,
- readIndex: File => T): T = {
-  val indexFile = new File(cacheDir, fileName + suffix)
-
-  def fetchAndCreateIndex(): T = {
-val tmpIndexFile = new File(cacheDir, fileName + suffix + 
RemoteIndexCache.TmpFileSuffix)
-
-val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata)
-try {
-  Files.copy(inputStream, tmpIndexFile.toPath)
-} finally {
-  if (inputStream != null) {
-inputStream.close()
-  }
-}
+if (isRemoteIndexCacheClosed.get()) {
+  throw new IllegalStateException(s"Unable to fetch index for " +
+s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. 
Index instance is already closed.")
+}
 
-Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath, 
false)
-readIndex(indexFile)
-  }
+inReadLock(lock) {
+  val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id()
+  internalCache.get(cacheKey, (uuid: Uuid) => {

Review Comment:
   Should we use `writeLock` here since the `get` means 
   > If the specified key is not already associated with a value, attempts to 
compute its value  using the given mapping function and enters it into this 
cache unless {@code null}. The entire method invocation is performed 
atomically, so the function is applied at most once per key. Some attempted 
update operations on this cache by other threads may be blocked while the 
computation is in progress, so the computation should be short and simple, and 
must not attempt to update any other mappings of this cache.
   
   So it's actually updating value
   
https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Cache.java#L60-L65



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-21 Thread via GitHub


divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1236598557


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -167,64 +235,76 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
   init()
 
   // Start cleaner thread that will clean the expired entries
-  val cleanerThread: ShutdownableThread = new 
ShutdownableThread("remote-log-index-cleaner") {
+  private[remote] var cleanerThread: ShutdownableThread = new 
ShutdownableThread(remoteLogIndexCacheCleanerThread) {
 setDaemon(true)
 
 override def doWork(): Unit = {
-  while (!closed) {
-try {
+  try {
+while (!isRemoteIndexCacheClosed.get()) {
   val entry = expiredIndexes.take()
-  info(s"Cleaning up index entry $entry")
+  debug(s"Cleaning up index entry $entry")
   entry.cleanup()
-} catch {
-  case ex: InterruptedException => info("Cleaner thread was 
interrupted", ex)
-  case ex: Exception => error("Error occurred while fetching/cleaning 
up expired entry", ex)
 }
+  } catch {
+case ex: InterruptedException =>
+  // cleaner thread should only be interrupted when cache is being 
closed, else it's an error
+  if (!isRemoteIndexCacheClosed.get()) {
+error("Cleaner thread received interruption but remote index cache 
is not closed", ex)
+throw ex
+  } else {
+debug("Cleaner thread was interrupted on cache shutdown")
+  }
+case ex: Exception => error("Error occurred while fetching/cleaning up 
expired entry", ex)
   }
 }
   }
+
   cleanerThread.start()
 
   def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry 
= {
-if(closed) throw new IllegalStateException("Instance is already closed.")
-
-def loadIndexFile[T](fileName: String,
- suffix: String,
- fetchRemoteIndex: RemoteLogSegmentMetadata => 
InputStream,
- readIndex: File => T): T = {
-  val indexFile = new File(cacheDir, fileName + suffix)
-
-  def fetchAndCreateIndex(): T = {
-val tmpIndexFile = new File(cacheDir, fileName + suffix + 
RemoteIndexCache.TmpFileSuffix)
-
-val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata)
-try {
-  Files.copy(inputStream, tmpIndexFile.toPath)
-} finally {
-  if (inputStream != null) {
-inputStream.close()
-  }
-}
+if (isRemoteIndexCacheClosed.get()) {
+  throw new IllegalStateException(s"Unable to fetch index for " +
+s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. 
Index instance is already closed.")
+}
 
-Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath, 
false)
-readIndex(indexFile)
-  }
+inReadLock(lock) {
+  val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id()
+  internalCache.get(cacheKey, (uuid: Uuid) => {

Review Comment:
   Yes, it will update the cache with a new entry. But since the internalCache 
is thread safe, we don't need to prevent any other thread from read/writing to 
another entry in the cache, hence, we don't need to acquire a write lock over 
the entire RemoteIndexCache.
   
   Does this answer your question?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13665: MINOR: Split ConsumerCoordinator#testCommitOffsetMetadata onto two test cases testing commitSync and commitAsync

2023-06-21 Thread via GitHub


machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1600404343

   > > offsets commits are happening somewhere else and that's via the 
producer.sendOffsetsToTransaction(..) e.g
   > 
   > Oh, EOS case! I didn't consider it, sorry! Hmm... if there is EOS case to 
consider, the original cache mechanism will not work since the offset commit is 
not via consumer, the consumer has no idea which offset has committed. I think 
we should close this PR and JIRA ticket as "invalid" and add comment into the 
JIRA ticket. WDYT?
   
   Thanks @showuon I've marked the JIRA as invalid. 
   As for the PR, I've repurposed it to only keep the things we can keep from 
this change. 
   Please have a look when you can. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] FireBurn commented on pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-06-21 Thread via GitHub


FireBurn commented on PR #13374:
URL: https://github.com/apache/kafka/pull/13374#issuecomment-1600428887

   The examples say:
   
   ```
 --add-scram ADD_SCRAM, -S ADD_SCRAM
A SCRAM_CREDENTIAL to add to the __cluster_metadata 
log e.g.
'SCRAM-SHA-256=[user=alice,password=alice-secret]'

'SCRAM-SHA-512=[user=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]'
   ```
   
   But the code errors unless name= is passed, should user be name in the 
example or is the check wrong 
https://github.com/apache/kafka/blame/49c1697ab08189c9707d04a6078aa9d5b69ed3aa/core/src/main/scala/kafka/tools/StorageTool.scala#L174


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15059) Exactly-once source tasks fail to start during pending rebalances

2023-06-21 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass resolved KAFKA-15059.
-
Resolution: Fixed

[~ChrisEgerton] since the PR is merged, I resolve this ticket.

> Exactly-once source tasks fail to start during pending rebalances
> -
>
> Key: KAFKA-15059
> URL: https://issues.apache.org/jira/browse/KAFKA-15059
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 3.6.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.6.0
>
>
> When asked to perform a round of zombie fencing, the distributed herder will 
> [reject the 
> request|https://github.com/apache/kafka/blob/17fd30e6b457f097f6a524b516eca1a6a74a9144/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1249-L1250]
>  if a rebalance is pending, which can happen if (among other things) a config 
> for a new connector or a new set of task configs has been recently read from 
> the config topic.
> Normally this can be alleviated with a simple task restart, which isn't great 
> but isn't terrible.
> However, when running MirrorMaker 2 in dedicated mode, there is no API to 
> restart failed tasks, and it can be more common to see this kind of failure 
> on a fresh cluster because three connector configurations are written in 
> rapid succession to the config topic.
>  
> In order to provide a better experience for users of both vanilla Kafka 
> Connect and dedicated MirrorMaker 2 clusters, we can retry (likely with the 
> same exponential backoff introduced with KAFKA-14732) zombie fencing attempts 
> that fail due to a pending rebalance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jlprat commented on a diff in pull request #13839: MINOR:Fill missing parameter annotations for LogCleaner methods

2023-06-21 Thread via GitHub


jlprat commented on code in PR #13839:
URL: https://github.com/apache/kafka/pull/13839#discussion_r1236520019


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -173,14 +174,25 @@ class LogCleaner(initialConfig: CleanerConfig,
 }
   }
 
+  /**
+   * Remove metrics when shutdown cleaner threads

Review Comment:
   I find this sentence a bit confusing, I think keeping it short is more than 
enough:
   ```suggestion
  * Remove metrics
   ```



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -173,14 +174,25 @@ class LogCleaner(initialConfig: CleanerConfig,
 }
   }
 
+  /**
+   * Remove metrics when shutdown cleaner threads
+   */
   def removeMetrics(): Unit = {
 LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
   }
 
+  /**
+   * @return A set of configs that is reconfigurable in LogCleaner
+   */
   override def reconfigurableConfigs: Set[String] = {
 LogCleaner.ReconfigurableConfigs
   }
 
+  /**
+   * Mainly validate the new cleaner threads num is reasonable

Review Comment:
   I think we should avoid the use of `Mainly` in Javadoc (or Scaladoc). Having 
a "mainly" in there might imply that there are other things this method does, 
but are not being shared.



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -803,6 +864,14 @@ private[log] class Cleaner(val id: Int,
 growBuffers(maxSize)
   }
 
+  /**
+   * Judge a batch should be discard by cleaned transaction state

Review Comment:
   I find "Judge" a bit odd in this situation.
   ```suggestion
  * Determine if a batch should be discard by cleaned transaction state
   ```



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -811,6 +880,18 @@ private[log] class Cleaner(val id: Int,
   transactionMetadata.onBatchRead(batch)
   }
 
+  /**
+   * Judge a record should be retained

Review Comment:
   Same as before
   ```suggestion
  * Determine if a record should be retained
   ```



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -91,6 +91,7 @@ import scala.util.control.ControlThrowable
  * @param initialConfig Initial configuration parameters for the cleaner. 
Actual config may be dynamically updated.
  * @param logDirs The directories where offset checkpoints reside
  * @param logs The pool of logs
+ * @param logDirFailureChannel The channel used to add offline log dirs that 
may occur when cleaning the log

Review Comment:
   I think `occur` should be replaced with `be encountered`. But this seems to 
be a nitpick.



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -215,27 +230,37 @@ class LogCleaner(initialConfig: CleanerConfig,
   /**
*  Abort the cleaning of a particular partition, if it's in progress. This 
call blocks until the cleaning of
*  the partition is aborted.
+   *
+   *  @param topicPartition The topic and partition to be abort cleaning
*/
   def abortCleaning(topicPartition: TopicPartition): Unit = {
 cleanerManager.abortCleaning(topicPartition)
   }
 
   /**
* Update checkpoint file to remove partitions if necessary.
+   *
+   * @param dataDir The file object to be updated

Review Comment:
   This is consistent with other JavaDocs in the code.



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -215,27 +230,37 @@ class LogCleaner(initialConfig: CleanerConfig,
   /**
*  Abort the cleaning of a particular partition, if it's in progress. This 
call blocks until the cleaning of
*  the partition is aborted.
+   *
+   *  @param topicPartition The topic and partition to be abort cleaning

Review Comment:
   ```suggestion
  *  @param topicPartition The topic and partition to abort cleaning
   ```



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -803,6 +864,14 @@ private[log] class Cleaner(val id: Int,
 growBuffers(maxSize)
   }
 
+  /**
+   * Judge a batch should be discard by cleaned transaction state
+   *
+   * @param batch The batch of records to judge

Review Comment:
   ```suggestion
  * @param batch The batch of records to cehck
   ```



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -811,6 +880,18 @@ private[log] class Cleaner(val id: Int,
   transactionMetadata.onBatchRead(batch)
   }
 
+  /**
+   * Judge a record should be retained
+   *
+   * @param map The offset map(key=>offset) to use for cleaning segments
+   * @param retainDeletesForLegacyRecords Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param batch The batch of records that the record belongs to
+   * @param record The record to judge

Review Comment:
   ```suggestion
  * @param record The record to check
   ```



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -215,27 +230,37 @@ class LogCleaner(initialConfig: CleanerConfig,
   /**
*  Abort the cleaning of a particular partitio

[GitHub] [kafka] xiaocairush commented on a diff in pull request #13884: MINOR: fix typos for client

2023-06-21 Thread via GitHub


xiaocairush commented on code in PR #13884:
URL: https://github.com/apache/kafka/pull/13884#discussion_r1236668769


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##
@@ -430,7 +430,7 @@ public void onFailure(RuntimeException e) {
  * leaders available. Topic partitions from `timestampsToSearch` that do 
not have their leader
  * available are added to `partitionsToRetry`
  *
- * @param timestampsToSearch The mapping from partitions ot the target 
timestamps
+ * @param timestampsToSearch The mapping from partitions of the target 
timestamps

Review Comment:
   Corrected to **"to"**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13839: MINOR:Fill missing parameter annotations for LogCleaner methods

2023-06-21 Thread via GitHub


hudeqi commented on code in PR #13839:
URL: https://github.com/apache/kafka/pull/13839#discussion_r1236681057


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -803,6 +864,14 @@ private[log] class Cleaner(val id: Int,
 growBuffers(maxSize)
   }
 
+  /**
+   * Judge a batch should be discard by cleaned transaction state
+   *
+   * @param batch The batch of records to judge

Review Comment:
   mean 'check'?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on a diff in pull request #13839: MINOR:Fill missing parameter annotations for LogCleaner methods

2023-06-21 Thread via GitHub


jlprat commented on code in PR #13839:
URL: https://github.com/apache/kafka/pull/13839#discussion_r1236681971


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -803,6 +864,14 @@ private[log] class Cleaner(val id: Int,
 growBuffers(maxSize)
   }
 
+  /**
+   * Judge a batch should be discard by cleaned transaction state
+   *
+   * @param batch The batch of records to judge

Review Comment:
   Yes, sorry, I did a typo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on a diff in pull request #13839: MINOR:Fill missing parameter annotations for LogCleaner methods

2023-06-21 Thread via GitHub


jlprat commented on code in PR #13839:
URL: https://github.com/apache/kafka/pull/13839#discussion_r1236682306


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -803,6 +864,14 @@ private[log] class Cleaner(val id: Int,
 growBuffers(maxSize)
   }
 
+  /**
+   * Judge a batch should be discard by cleaned transaction state

Review Comment:
   You could also use "check" instead of "determine"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer

2023-06-21 Thread Tomonari Yamashita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735625#comment-17735625
 ] 

Tomonari Yamashita commented on KAFKA-15108:


Hi [~mjsax],
Thank you. I wasn't sure if this issume was unexpected behavior or not, so your 
advice was very helpful.

> task.timeout.ms does not work when TimeoutException is thrown by streams 
> producer
> -
>
> Key: KAFKA-15108
> URL: https://issues.apache.org/jira/browse/KAFKA-15108
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: Tomonari Yamashita
>Priority: Major
>
> [Problem]
>  - task.timeout.ms does not work when TimeoutException is thrown by streams 
> producer
>  -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
> TimeoutException thrown by the consumer, producer, and admin client."(1) and 
> "To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
> (default is 5 minutes)."(1).
>  -- However, it doesn't look like task.timeout.ms is working for the streams 
> producer, then it seems to keep retrying forever.
> [Environment]
>  - Kafka Streams 3.5.0
> [Reproduction procedure]
>  # Create "input-topic" topic
>  # Put several messages on "input-topic"
>  # DONT create "output-topic" topic, to fire TimeoutException
>  # Create the following simple Kafka streams program; this program just 
> transfers messages from "input-topic" to "output-topic".
>  -- 
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
> props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
>  // not needed
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
> .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
> KafkaStreams streams = new KafkaStreams(builder.build(), props);
> {code}
>  # Wait for task.timeout.ms (default is 5 minutes).
>  ## If the debug log is enabled, a large number of 
> UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not 
> exist.
>  ## And every one minute, TimeoutException will be generated (2)
>  # ==> However, it doesn't look like task.timeout.ms is working for the 
> streams producer, then it seems to keep retrying forever.
>  ## My excepted behavior is that task.timeout.ms is working, and the client 
> will be shutdown because the default behavior is 
> StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.
> [As far as my investigation]
>  - TimeoutException thrown by the streams producer is replaced with 
> TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3)
>  - And after that it does not appear to be executing code that contains logic 
> related to task.timeout.ms.
> (1) Kafka Streams upgrade guide
>  - [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
>  - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
> {code:java}
> Kafka Streams is now handling TimeoutException thrown by the consumer, 
> producer, and admin client. If a timeout occurs on a task, Kafka Streams 
> moves to the next task and retries to make progress on the failed task in the 
> next iteration. To bound how long Kafka Streams retries a task, you can set 
> task.timeout.ms (default is 5 minutes). If a task does not make progress 
> within the specified task timeout, which is tracked on a per-task basis, 
> Kafka Streams throws a TimeoutException (cf. KIP-572).
> {code}
> (2) TimeoutException occurs
> {code:java}
> 2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Error while fetching metadata with correlation id 1065 : 
> {output-topic=UNKNOWN_TOPIC_OR_PARTITION}
> 2023-06-19 19:51:26 DEBUG Metadata:363 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Requesting metadata update for topic output-topic due to error 
> UNKNOWN_TOPIC_OR_PARTITION
> 2023-06-19 19:51:26 DEBUG Metadata:291 - [Producer 
> clientId=java-kafka-streams-8dd73f1e-f9b6-4058-895f-448143c4e5cf-StreamThread-1-producer]
>  Updated cluster metadata updateVersion 1064 to 
> MetadataCache{clusterId='ulBlb0C3Qdau

[GitHub] [kafka] cadonna commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-21 Thread via GitHub


cadonna commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1236586513


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -886,11 +885,9 @@ public void 
shouldSuspendRevokedTaskRemovedFromStateUpdater() {
 when(stateUpdater.hasRemovedTasks()).thenReturn(true);
 when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
 final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-replay(consumer);
 
 taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-verify(consumer);

Review Comment:
   Here you need to add `Mockito.verifyNoInteractions(consumer)` because that 
was the intent of replaying a consumer without expectations and verifying it.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -886,11 +885,9 @@ public void 
shouldSuspendRevokedTaskRemovedFromStateUpdater() {
 when(stateUpdater.hasRemovedTasks()).thenReturn(true);
 when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
 final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-replay(consumer);
 
 taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-verify(consumer);
 Mockito.verify(statefulTask).suspend();
 Mockito.verify(tasks).addTask(statefulTask);
 }

Review Comment:
   nit (and probably my fault 🙂) 
   ```suggestion
   }
   
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1066,14 +1054,12 @@ public void 
shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
 final TaskManager taskManager = 
setUpTransitionToRunningOfRestoredTask(task, tasks);
 final TimeoutException timeoutException = new TimeoutException();
 doThrow(timeoutException).when(task).completeRestoration(noOpResetter);
-replay(consumer);
 
 taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
 Mockito.verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), 
Mockito.eq(timeoutException));
 Mockito.verify(tasks, never()).addTask(task);
 Mockito.verify(task, never()).clearTaskTimeout();
-verify(consumer);

Review Comment:
   Also here, please add `Mockito.verifyNoInteractions(consumer)`.
   
   You also miss this verification in other places.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1046,15 +1036,13 @@ public void shouldTransitRestoredTaskToRunning() {
 .withInputPartitions(taskId00Partitions).build();
 final TasksRegistry tasks = mock(TasksRegistry.class);
 final TaskManager taskManager = 
setUpTransitionToRunningOfRestoredTask(task, tasks);
-consumer.resume(task.inputPartitions());
-replay(consumer);
 
 taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
 Mockito.verify(task).completeRestoration(noOpResetter);
 Mockito.verify(task).clearTaskTimeout();
 Mockito.verify(tasks).addTask(task);
-verify(consumer);
+Mockito.verify(consumer).resume(task.inputPartitions());

Review Comment:
   I think, I cannot completely follow your reasoning here.  Why would you add 
`verifyNoMoreInteractions`? The important thing here is that the consumer 
resumes polling from the input partitions. However, I also see that with 
easymock this test verifies that `consumer.resume(task.inputPartitions())` is 
the only method called on the consumer mock. I am fine either way.  



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -918,9 +915,6 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
 .thenReturn(convertedTask1);
 when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, 
taskId00Partitions))
 .thenReturn(convertedTask0);
-expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-consumer.resume(anyObject());
-expectLastCall().anyTimes();

Review Comment:
   It seems I was a bit sloppy here. When a task is removed from the state 
updater, there should be no interactions with the consumer. Please remove the 
expecations and verify for no interactions with the consumer mock.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -918,9 +915,6 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
 .thenReturn(convertedTask1);
 when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, 
taskId00Partitions))
 .thenReturn(convertedTask0);
-expect(consumer.assig

[jira] [Created] (KAFKA-15110) Wrong version may be run, which will cause it to fail to run when there are multiple version jars under core/build/libs

2023-06-21 Thread hudeqi (Jira)
hudeqi created KAFKA-15110:
--

 Summary: Wrong version may be run, which will cause it to fail to 
run when there are multiple version jars under core/build/libs
 Key: KAFKA-15110
 URL: https://issues.apache.org/jira/browse/KAFKA-15110
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 3.4.1
Reporter: hudeqi
Assignee: hudeqi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15110) Wrong version may be run, which will cause to fail to run when there are multiple version jars under core/build/libs

2023-06-21 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-15110:
---
Summary: Wrong version may be run, which will cause to fail to run when 
there are multiple version jars under core/build/libs  (was: Wrong version may 
be run, which will cause it to fail to run when there are multiple version jars 
under core/build/libs)

> Wrong version may be run, which will cause to fail to run when there are 
> multiple version jars under core/build/libs
> 
>
> Key: KAFKA-15110
> URL: https://issues.apache.org/jira/browse/KAFKA-15110
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.4.1
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15110) Wrong version may be run, which will cause to fail to run when there are multiple version jars under core/build/libs

2023-06-21 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-15110:
---
Description: 
For example, when I build a jar through './gradlew jar' under a 3.5.0 branch, 
and then switch to a 3.6.0 branch, a jar is also built. Since "core/build/libs" 
is a  "gitignore dir", there will be two versions of packages in this 
directory. At this time, when I start the kafka process under the local bin 
dir, I will encounter the problem that it cannot be started because the version 
is running incorrectly.
The reason is that /kafka-run-class.sh transfers all jar packages to CLASSPATH 
by default, which is an unreasonable behavior.

For details, see the attached screenshot below:

Figure 1 shows that the abnormal exit because of the missing of 
ProducerStateManagerConfig(int xx) method, which is defined in version 3.5.0, 
but the initial method of ProducerStateManagerConfig in version 3.6.0 has two 
parameters.

Figure 2 is the printed value of CLASSPATH.

> Wrong version may be run, which will cause to fail to run when there are 
> multiple version jars under core/build/libs
> 
>
> Key: KAFKA-15110
> URL: https://issues.apache.org/jira/browse/KAFKA-15110
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.4.1
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>
> For example, when I build a jar through './gradlew jar' under a 3.5.0 branch, 
> and then switch to a 3.6.0 branch, a jar is also built. Since 
> "core/build/libs" is a  "gitignore dir", there will be two versions of 
> packages in this directory. At this time, when I start the kafka process 
> under the local bin dir, I will encounter the problem that it cannot be 
> started because the version is running incorrectly.
> The reason is that /kafka-run-class.sh transfers all jar packages to 
> CLASSPATH by default, which is an unreasonable behavior.
> For details, see the attached screenshot below:
> Figure 1 shows that the abnormal exit because of the missing of 
> ProducerStateManagerConfig(int xx) method, which is defined in version 3.5.0, 
> but the initial method of ProducerStateManagerConfig in version 3.6.0 has two 
> parameters.
> Figure 2 is the printed value of CLASSPATH.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi opened a new pull request, #13893: KAFKA-15110:Fix the failure to start due to the existence of multiple version packages under libs

2023-06-21 Thread via GitHub


hudeqi opened a new pull request, #13893:
URL: https://github.com/apache/kafka/pull/13893

   ### Activation
   For example, when I build a jar through './gradlew jar' under a 3.5.0 
branch, and then switch to a 3.6.0 branch, a jar is also built. Since 
"core/build/libs" is a  "gitignore dir", there will be two versions of packages 
in this directory. At this time, when I start the kafka process under the local 
bin dir, I will encounter the problem that it cannot be started because the 
version is running incorrectly.
   The reason is that /kafka-run-class.sh transfers all jar packages to 
CLASSPATH by default, which is an unreasonable behavior.
   The issue scene is [jira](https://issues.apache.org/jira/browse/KAFKA-15110)
   
   ### Solution
   Select the package to run the current code version in kafka-run-class.sh. 
   After testing, this problem was solved very well. Need to see if this change 
will affect CI test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15110) Wrong version may be run, which will cause to fail to run when there are multiple version jars under core/build/libs

2023-06-21 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-15110:
---
Attachment: WechatIMG28.jpeg

> Wrong version may be run, which will cause to fail to run when there are 
> multiple version jars under core/build/libs
> 
>
> Key: KAFKA-15110
> URL: https://issues.apache.org/jira/browse/KAFKA-15110
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.4.1
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
> Attachments: WechatIMG28.jpeg, WechatIMG29.jpeg
>
>
> For example, when I build a jar through './gradlew jar' under a 3.5.0 branch, 
> and then switch to a 3.6.0 branch, a jar is also built. Since 
> "core/build/libs" is a  "gitignore dir", there will be two versions of 
> packages in this directory. At this time, when I start the kafka process 
> under the local bin dir, I will encounter the problem that it cannot be 
> started because the version is running incorrectly.
> The reason is that /kafka-run-class.sh transfers all jar packages to 
> CLASSPATH by default, which is an unreasonable behavior.
> For details, see the attached screenshot below:
> Figure 1 shows that the abnormal exit because of the missing of 
> ProducerStateManagerConfig(int xx) method, which is defined in version 3.5.0, 
> but the initial method of ProducerStateManagerConfig in version 3.6.0 has two 
> parameters.
> Figure 2 is the printed value of CLASSPATH.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15110) Wrong version may be run, which will cause to fail to run when there are multiple version jars under core/build/libs

2023-06-21 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-15110:
---
Attachment: WechatIMG29.jpeg

> Wrong version may be run, which will cause to fail to run when there are 
> multiple version jars under core/build/libs
> 
>
> Key: KAFKA-15110
> URL: https://issues.apache.org/jira/browse/KAFKA-15110
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.4.1
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
> Attachments: WechatIMG28.jpeg, WechatIMG29.jpeg
>
>
> For example, when I build a jar through './gradlew jar' under a 3.5.0 branch, 
> and then switch to a 3.6.0 branch, a jar is also built. Since 
> "core/build/libs" is a  "gitignore dir", there will be two versions of 
> packages in this directory. At this time, when I start the kafka process 
> under the local bin dir, I will encounter the problem that it cannot be 
> started because the version is running incorrectly.
> The reason is that /kafka-run-class.sh transfers all jar packages to 
> CLASSPATH by default, which is an unreasonable behavior.
> For details, see the attached screenshot below:
> Figure 1 shows that the abnormal exit because of the missing of 
> ProducerStateManagerConfig(int xx) method, which is defined in version 3.5.0, 
> but the initial method of ProducerStateManagerConfig in version 3.6.0 has two 
> parameters.
> Figure 2 is the printed value of CLASSPATH.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tombentley commented on pull request #13862: KAFKA-15050: format the prompts in the quickstart

2023-06-21 Thread via GitHub


tombentley commented on PR #13862:
URL: https://github.com/apache/kafka/pull/13862#issuecomment-1600548439

   @mimaison mentioned in a separate conversation a couple of other points:
   
   1. it looks like we never need to refer to any of the example lines of 
console input/output using the line numbers, so perhaps we can get rid of the 
line number for shell command blocks (to it's less visually busy). 
   2. the console producer example is also missing the `>` prompts, maybe we 
can add them as part of this PR too
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15110) Wrong version may be run, which will cause to fail to run when there are multiple version jars under core/build/libs

2023-06-21 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735644#comment-17735644
 ] 

hudeqi commented on KAFKA-15110:


Leave an exception log so that others can find this problem:

 

[2023-06-21 18:00:59,186] INFO Metrics reporters closed 
(org.apache.kafka.common.metrics.Metrics)
[2023-06-21 18:00:59,188] INFO Broker and topic stats closed 
(kafka.server.BrokerTopicStats)
[2023-06-21 18:00:59,190] INFO App info kafka.server for 0 unregistered 
(org.apache.kafka.common.utils.AppInfoParser)
[2023-06-21 18:00:59,190] INFO [KafkaServer id=0] shut down completed 
(kafka.server.KafkaServer)
[2023-06-21 18:00:59,190] ERROR Exiting Kafka due to fatal exception during 
startup. (kafka.Kafka$)
java.lang.NoSuchMethodError: 'void 
org.apache.kafka.storage.internals.log.ProducerStateManagerConfig.(int)'
        at kafka.log.LogManager$.apply(LogManager.scala:1394)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:279)
        at kafka.Kafka$.main(Kafka.scala:113)
        at kafka.Kafka.main(Kafka.scala)
[2023-06-21 18:00:59,190] INFO [KafkaServer id=0] shutting down 
(kafka.server.KafkaServer)

> Wrong version may be run, which will cause to fail to run when there are 
> multiple version jars under core/build/libs
> 
>
> Key: KAFKA-15110
> URL: https://issues.apache.org/jira/browse/KAFKA-15110
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.4.1
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
> Attachments: WechatIMG28.jpeg, WechatIMG29.jpeg
>
>
> For example, when I build a jar through './gradlew jar' under a 3.5.0 branch, 
> and then switch to a 3.6.0 branch, a jar is also built. Since 
> "core/build/libs" is a  "gitignore dir", there will be two versions of 
> packages in this directory. At this time, when I start the kafka process 
> under the local bin dir, I will encounter the problem that it cannot be 
> started because the version is running incorrectly.
> The reason is that /kafka-run-class.sh transfers all jar packages to 
> CLASSPATH by default, which is an unreasonable behavior.
> For details, see the attached screenshot below:
> Figure 1 shows that the abnormal exit because of the missing of 
> ProducerStateManagerConfig(int xx) method, which is defined in version 3.5.0, 
> but the initial method of ProducerStateManagerConfig in version 3.6.0 has two 
> parameters.
> Figure 2 is the printed value of CLASSPATH.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fxbing closed pull request #13892: MINOR: change logger used by CoreUtils#swallow()

2023-06-21 Thread via GitHub


fxbing closed pull request #13892: MINOR: change logger used by 
CoreUtils#swallow()
URL: https://github.com/apache/kafka/pull/13892


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tkuramoto33 commented on pull request #13867: MINOR: Fix help message for kafka-metadata-shell.

2023-06-21 Thread via GitHub


tkuramoto33 commented on PR #13867:
URL: https://github.com/apache/kafka/pull/13867#issuecomment-1600621034

   @cmccabe 
   I'm sorry to bother you, but could you please review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13877: MINOR:Fix wrong semantics hint in TROGDOR.md

2023-06-21 Thread via GitHub


hudeqi commented on code in PR #13877:
URL: https://github.com/apache/kafka/pull/13877#discussion_r1236826573


##
TROGDOR.md:
##
@@ -143,7 +143,7 @@ RoundTripWorkload tests both production and consumption.  
The workload starts a
 
 ### ConsumeBench
 ConsumeBench starts one or more Kafka consumers on a single agent node. 
Depending on the passed in configuration (see ConsumeBenchSpec), the consumers 
either subscribe to a set of topics (leveraging consumer group functionality 
and dynamic partition assignment) or manually assign partitions to themselves.
-The workload measures the average produce latency, as well as the median, 95th 
percentile, and 99th percentile latency.
+The workload measures the average consume latency, as well as the median, 95th 
percentile, and 99th percentile latency.

Review Comment:
   
![WechatIMG30](https://github.com/apache/kafka/assets/16536770/5e0cdd96-8c86-4016-a452-700975c1317f)
   
   @divijvaidya Hi, I have run the test, it is used to measure consume latency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13876: KAFKA-10733: Clean up producer exceptions

2023-06-21 Thread via GitHub


lucasbru commented on code in PR #13876:
URL: https://github.com/apache/kafka/pull/13876#discussion_r1236863600


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1013,16 +1013,8 @@ private void maybeFailWithError() {
 if (!hasError()) {
 return;
 }
-// for ProducerFencedException, do not wrap it as a KafkaException
-// but create a new instance without the call trace since it was not 
thrown because of the current call
-if (lastError instanceof ProducerFencedException) {
-throw new ProducerFencedException("Producer with transactionalId 
'" + transactionalId
-+ "' and " + producerIdAndEpoch + " has been fenced by 
another producer " +
-"with the same transactionalId");
-}
-if (lastError instanceof InvalidProducerEpochException) {

Review Comment:
   `INVALID_PRODUCER_EPOCH` is always converted to a `ProducerFencedException`, 
so we should not see `InvalidProducerEpochException` here anymore, right? There 
are tests mocking a `INVALID_PRODUCER_EPOCH` response though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13876: KAFKA-10733: Clean up producer exceptions

2023-06-21 Thread via GitHub


lucasbru commented on code in PR #13876:
URL: https://github.com/apache/kafka/pull/13876#discussion_r1236873661


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1013,16 +1013,8 @@ private void maybeFailWithError() {
 if (!hasError()) {
 return;
 }
-// for ProducerFencedException, do not wrap it as a KafkaException

Review Comment:
   That's as much as I got from the comment, but I am not sure if it makes 
sense. Yes, producers are generally not fenced because of a specific operation, 
but because something happened on the server side, but it's still "correct" to 
include a call trace to the operation where the broker first returned a fencing 
error. At least, it doesn't seem worth complicating the exception handling with 
these inconsistencies to hide the call trace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] szalapski commented on pull request #5876: KAFKA-7509: Avoid passing most non-applicable properties to producer, consumer, and admin client

2023-06-21 Thread via GitHub


szalapski commented on PR #5876:
URL: https://github.com/apache/kafka/pull/5876#issuecomment-1600744350

   @rhauch Can we either close or merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13877: MINOR:Fix wrong semantics hint in TROGDOR.md

2023-06-21 Thread via GitHub


divijvaidya merged PR #13877:
URL: https://github.com/apache/kafka/pull/13877


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15059) Exactly-once source tasks fail to start during pending rebalances

2023-06-21 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735711#comment-17735711
 ] 

Chris Egerton commented on KAFKA-15059:
---

Ah, thanks Viktor!

> Exactly-once source tasks fail to start during pending rebalances
> -
>
> Key: KAFKA-15059
> URL: https://issues.apache.org/jira/browse/KAFKA-15059
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 3.6.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.6.0
>
>
> When asked to perform a round of zombie fencing, the distributed herder will 
> [reject the 
> request|https://github.com/apache/kafka/blob/17fd30e6b457f097f6a524b516eca1a6a74a9144/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1249-L1250]
>  if a rebalance is pending, which can happen if (among other things) a config 
> for a new connector or a new set of task configs has been recently read from 
> the config topic.
> Normally this can be alleviated with a simple task restart, which isn't great 
> but isn't terrible.
> However, when running MirrorMaker 2 in dedicated mode, there is no API to 
> restart failed tasks, and it can be more common to see this kind of failure 
> on a fresh cluster because three connector configurations are written in 
> rapid succession to the config topic.
>  
> In order to provide a better experience for users of both vanilla Kafka 
> Connect and dedicated MirrorMaker 2 clusters, we can retry (likely with the 
> same exponential backoff introduced with KAFKA-14732) zombie fencing attempts 
> that fail due to a pending rebalance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya merged pull request #13889: MINOR: Fix typos for metadata

2023-06-21 Thread via GitHub


divijvaidya merged PR #13889:
URL: https://github.com/apache/kafka/pull/13889


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15109) ISR shrink/expand issues on ZK brokers during migration

2023-06-21 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-15109:


Assignee: David Arthur

> ISR shrink/expand issues on ZK brokers during migration
> ---
>
> Key: KAFKA-15109
> URL: https://issues.apache.org/jira/browse/KAFKA-15109
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, replication
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
>
> KAFKA-15021 introduced a new controller behavior that avoids increasing the 
> leader epoch during the controlled shutdown scenario. This prevents some 
> unnecessary thrashing of metadata and threads on the brokers and clients. 
> While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
> brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers 
> must have the leader epoch bump in order for ReplicaManager to react to the 
> LeaderAndIsrRequest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13881: MINOR: fix typo of ProducerConfig and KafkaProducer

2023-06-21 Thread via GitHub


C0urante commented on code in PR #13881:
URL: https://github.com/apache/kafka/pull/13881#discussion_r1236988175


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -94,8 +94,8 @@ public class ProducerConfig extends AbstractConfig {
  + "batch size is under this 
batch.size setting.";
 
 /** partitioner.adaptive.partitioning.enable */
-public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG 
= "partitioner.adaptive.partitioning.enable";

Review Comment:
   If we want to remove this field in a future version, we can deprecate it now 
and introduce a new field with the correct spelling.
   
   That might fall into KIP territory, though. @divijvaidya thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13881: MINOR: fix typo of ProducerConfig and KafkaProducer

2023-06-21 Thread via GitHub


C0urante commented on code in PR #13881:
URL: https://github.com/apache/kafka/pull/13881#discussion_r1236988175


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -94,8 +94,8 @@ public class ProducerConfig extends AbstractConfig {
  + "batch size is under this 
batch.size setting.";
 
 /** partitioner.adaptive.partitioning.enable */
-public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG 
= "partitioner.adaptive.partitioning.enable";

Review Comment:
   If we want to remove (rename) this field in a future version, we can 
deprecate it now and introduce a new field with the correct spelling.
   
   That might fall into KIP territory, though. @divijvaidya thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14112) Expose replication-offset-lag Mirror metric

2023-06-21 Thread Viktor Somogyi-Vass (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735718#comment-17735718
 ] 

Viktor Somogyi-Vass commented on KAFKA-14112:
-

Yea, it's LEO-LRO. btw it's not contributed back yet so if you feel so, I'm 
happy to review your PR if you implement it.

> Expose replication-offset-lag Mirror metric
> ---
>
> Key: KAFKA-14112
> URL: https://issues.apache.org/jira/browse/KAFKA-14112
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Elkhan Eminov
>Assignee: Elkhan Eminov
>Priority: Minor
>
> The offset lag is the difference of the last replicated record's (LRO) source 
> offset and the end offset of the source (LEO).
> The offset lag is a difference (LRO-LEO), but its constituents calculated at 
> different points of time and place
>  * LEO shall be calculated during source task's poll loop (ready to get it 
> from the consumer)
>  * LRO shall be kept in an in-memory "cache", that is updated during the 
> task's producer callback
> LRO is initialized when task is started, from the offset store. The 
> difference shall be calculated when the freshest LEO acquired
> in the poll loop. The calculated amount shall be defined as a MirrorMaker 
> metric.
> This would describe to amount of "to be replicated" number of records for a 
> certain topic-partition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13881: MINOR: fix typo of ProducerConfig and KafkaProducer

2023-06-21 Thread via GitHub


divijvaidya commented on code in PR #13881:
URL: https://github.com/apache/kafka/pull/13881#discussion_r1236994770


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -94,8 +94,8 @@ public class ProducerConfig extends AbstractConfig {
  + "batch size is under this 
batch.size setting.";
 
 /** partitioner.adaptive.partitioning.enable */
-public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG 
= "partitioner.adaptive.partitioning.enable";

Review Comment:
   Thanks for your suggestion, that is the right way to go about it @C0urante. 
I understand that a KIP for this change might be a an overdo but let's follow 
the rules decided by the community (and create a KIP for deprecation).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14112) Expose replication-offset-lag Mirror metric

2023-06-21 Thread Viktor Somogyi-Vass (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735719#comment-17735719
 ] 

Viktor Somogyi-Vass commented on KAFKA-14112:
-

Also I think we would need a small KIP for new metrics, so please create that 
too.

> Expose replication-offset-lag Mirror metric
> ---
>
> Key: KAFKA-14112
> URL: https://issues.apache.org/jira/browse/KAFKA-14112
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Elkhan Eminov
>Assignee: Elkhan Eminov
>Priority: Minor
>
> The offset lag is the difference of the last replicated record's (LRO) source 
> offset and the end offset of the source (LEO).
> The offset lag is a difference (LRO-LEO), but its constituents calculated at 
> different points of time and place
>  * LEO shall be calculated during source task's poll loop (ready to get it 
> from the consumer)
>  * LRO shall be kept in an in-memory "cache", that is updated during the 
> task's producer callback
> LRO is initialized when task is started, from the offset store. The 
> difference shall be calculated when the freshest LEO acquired
> in the poll loop. The calculated amount shall be defined as a MirrorMaker 
> metric.
> This would describe to amount of "to be replicated" number of records for a 
> certain topic-partition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13881: MINOR: fix typo of ProducerConfig and KafkaProducer

2023-06-21 Thread via GitHub


divijvaidya commented on PR #13881:
URL: https://github.com/apache/kafka/pull/13881#issuecomment-1600829354

   @leo0842 since this is your first contribution (thank you for your 
participation!), you may not be familiar with the concept of KIP. You can read 
more about it here: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13582: MINOR: Fix lossy conversions flagged by Java 20

2023-06-21 Thread via GitHub


ijuma commented on PR #13582:
URL: https://github.com/apache/kafka/pull/13582#issuecomment-1600838939

   Jobs run in parallel and hence don't add time. Daily jobs are basically 
ignored and are sort of useless. But we can discuss that in the relevant PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #13894: MINOR: Increase Github API operations for stale PR check

2023-06-21 Thread via GitHub


mumrah opened a new pull request, #13894:
URL: https://github.com/apache/kafka/pull/13894

   The default of 30 operations isn't enough to iterate through all of our old 
PRs. Increasing to 100 to capture more of the stale PRs. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13894: MINOR: Increase Github API operations for stale PR check

2023-06-21 Thread via GitHub


mumrah merged PR #13894:
URL: https://github.com/apache/kafka/pull/13894


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat merged pull request #13839: MINOR:Fill missing parameter annotations for LogCleaner methods

2023-06-21 Thread via GitHub


jlprat merged PR #13839:
URL: https://github.com/apache/kafka/pull/13839


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build

2023-06-21 Thread via GitHub


ijuma commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1237055862


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -291,7 +291,7 @@ public long skip(long toSkip) throws IOException {
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
 long bytesSkipped = (avail < remaining) ? avail : remaining;
-pos += bytesSkipped;
+pos += (int) bytesSkipped;

Review Comment:
   @divijvaidya Looks like one of your changes introduced this issue where we 
are adding a `long` to an `int` in a couple of places. Can you please take a 
look and suggest what's the right fix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build

2023-06-21 Thread via GitHub


ijuma commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1237055862


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -291,7 +291,7 @@ public long skip(long toSkip) throws IOException {
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
 long bytesSkipped = (avail < remaining) ? avail : remaining;
-pos += bytesSkipped;
+pos += (int) bytesSkipped;

Review Comment:
   @divijvaidya Looks like one of your changes introduced this issue where we 
are adding a `long` to an `int` in a couple of places in this file. Can you 
please take a look and suggest what's the right fix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build

2023-06-21 Thread via GitHub


ijuma commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1237055862


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -291,7 +291,7 @@ public long skip(long toSkip) throws IOException {
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
 long bytesSkipped = (avail < remaining) ? avail : remaining;
-pos += bytesSkipped;
+pos += (int) bytesSkipped;

Review Comment:
   @divijvaidya Looks like one of your changes introduced this issue where we 
are adding a `long` to an `int` in a couple of places in this file. Can you 
please take a look and suggest what's the right fix (I just did the simplest 
thing to make the compiler not complain - it's not safe as it stands)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-21 Thread via GitHub


ijuma commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1237064294


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -291,7 +291,7 @@ public long skip(long toSkip) throws IOException {
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
 long bytesSkipped = (avail < remaining) ? avail : remaining;
-pos += bytesSkipped;
+pos += (int) bytesSkipped;

Review Comment:
   This is the main blocker remaining before this PR can be non draft.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14112) Expose replication-offset-lag Mirror metric

2023-06-21 Thread Elkhan Eminov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735744#comment-17735744
 ] 

Elkhan Eminov commented on KAFKA-14112:
---

[~viktorsomogyi] yep I'm on it and it's almost done, I was just super confused 
by the description. thanks for clarifying. will create the KIP too.

> Expose replication-offset-lag Mirror metric
> ---
>
> Key: KAFKA-14112
> URL: https://issues.apache.org/jira/browse/KAFKA-14112
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Elkhan Eminov
>Assignee: Elkhan Eminov
>Priority: Minor
>
> The offset lag is the difference of the last replicated record's (LRO) source 
> offset and the end offset of the source (LEO).
> The offset lag is a difference (LRO-LEO), but its constituents calculated at 
> different points of time and place
>  * LEO shall be calculated during source task's poll loop (ready to get it 
> from the consumer)
>  * LRO shall be kept in an in-memory "cache", that is updated during the 
> task's producer callback
> LRO is initialized when task is started, from the offset store. The 
> difference shall be calculated when the freshest LEO acquired
> in the poll loop. The calculated amount shall be defined as a MirrorMaker 
> metric.
> This would describe to amount of "to be replicated" number of records for a 
> certain topic-partition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14112) Expose replication-offset-lag Mirror metric

2023-06-21 Thread Elkhan Eminov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Eminov updated KAFKA-14112:
--
Description: 
The replication offset lag is the difference of the {*}l{*}ast {*}e{*}nd 
{*}o{*}ffset of the source partition (LEO) the {*}l{*}ast {*}r{*}eplicated  
_source_ {*}o{*}ffset (LRO).
The offset lag is a difference (LEO-LRO), and its constituents are calculated 
at different points of time and place:
 * LEO shall be calculated during source task's poll loop (ready to get it from 
the consumer)
 * LRO shall be kept in an in-memory "cache", that is updated during the task's 
producer callback

The difference shall be calculated when the freshest LEO acquired in the poll 
loop. The calculated amount shall be defined as a MirrorMaker metric.

This would describe to amount of "to be replicated" number of records for a 
certain topic-partition.

  was:
The offset lag is the difference of the last replicated record's (LRO) source 
offset and the end offset of the source (LEO).
The offset lag is a difference (LRO-LEO), but its constituents calculated at 
different points of time and place
 * LEO shall be calculated during source task's poll loop (ready to get it from 
the consumer)
 * LRO shall be kept in an in-memory "cache", that is updated during the task's 
producer callback

LRO is initialized when task is started, from the offset store. The difference 
shall be calculated when the freshest LEO acquired
in the poll loop. The calculated amount shall be defined as a MirrorMaker 
metric.

This would describe to amount of "to be replicated" number of records for a 
certain topic-partition.


> Expose replication-offset-lag Mirror metric
> ---
>
> Key: KAFKA-14112
> URL: https://issues.apache.org/jira/browse/KAFKA-14112
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Elkhan Eminov
>Assignee: Elkhan Eminov
>Priority: Minor
>
> The replication offset lag is the difference of the {*}l{*}ast {*}e{*}nd 
> {*}o{*}ffset of the source partition (LEO) the {*}l{*}ast {*}r{*}eplicated  
> _source_ {*}o{*}ffset (LRO).
> The offset lag is a difference (LEO-LRO), and its constituents are calculated 
> at different points of time and place:
>  * LEO shall be calculated during source task's poll loop (ready to get it 
> from the consumer)
>  * LRO shall be kept in an in-memory "cache", that is updated during the 
> task's producer callback
> The difference shall be calculated when the freshest LEO acquired in the poll 
> loop. The calculated amount shall be defined as a MirrorMaker metric.
> This would describe to amount of "to be replicated" number of records for a 
> certain topic-partition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru commented on pull request #13876: KAFKA-10733: Clean up producer exceptions

2023-06-21 Thread via GitHub


lucasbru commented on PR #13876:
URL: https://github.com/apache/kafka/pull/13876#issuecomment-1600924404

   > 1. We might end up breaking customer's application code for exception 
handling with this change. What is our plan to prevent the inadvertent impact 
to customer's code on upgrade?
   > 2. This should probably be accompanied with changes in docs and notable 
change section for 3.6.0 - 
[kafka.apache.org/documentation.html#upgrade_350_notable](https://kafka.apache.org/documentation.html#upgrade_350_notable)
   
@divijvaidya 
   
   - To quote the accepted KIP, "This is a pure client side change which only 
affects the resiliency of new Producer client and Streams. For customized EOS 
use case, user needs to change their exception catching logic to take actions 
against their exception handling ... However, all the thrown exceptions' base 
type would still be KafkaException, so the effect should be minimal."
   
   However, I think you have a point that in the case of 
`ProducerFencedException` / `InvalidProducerEpochException`, we should be 
careful, as it is something that the user may catch frequently. And the KIP 
itself seems to have two conflicting views on this, authored by two different 
developers @abbccdda and @guozhangwang . In the original form, the KIP proposed 
to never wrap fatal errors, while the updated version proposed to wrap 
`ProducerFencedException` and `InvalidProducerEpochException`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-21 Thread via GitHub


C0urante commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1237125779


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
 
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+// This should only occur for an offset reset request when:
+// 1. There was a prior attempt to reset offsets
+// OR
+// 2. No offsets have been committed yet
+if (offsetsToWrite.isEmpty()) {

Review Comment:
   @yashmayya this PR is already fairly c

[GitHub] [kafka] dajac commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-21 Thread via GitHub


dajac commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1237133352


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = write

[GitHub] [kafka] mimaison opened a new pull request, #13896: MINOR: Fix generated client ids for Connect

2023-06-21 Thread via GitHub


mimaison opened a new pull request, #13896:
URL: https://github.com/apache/kafka/pull/13896

   `CLIENT_ID_CONFIG` defaults to empty string: 
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java#L332-L336
   
   So when it's not set explicitly, we end up with internal clients with 
client-ids like `connect-cluster--shared-admin` or `connect-cluster--offsets`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-21 Thread via GitHub


C0urante commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1237140142


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1268,39 +1270,55 @@ public void alterConnectorOffsets(String connName, 
Map connector
 connector = plugins.newConnector(connectorClassOrAlias);
 if (ConnectUtils.isSinkConnector(connector)) {
 log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
-alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+modifySinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
 } else {
 log.debug("Altering offsets for source connector: {}", 
connName);
-alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+modifySourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Reset a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be reset
+ * @param connectorConfig the connector's configurations
+ * @param cb callback to invoke upon completion
+ */
+public void resetConnectorOffsets(String connName, Map 
connectorConfig, Callback cb) {

Review Comment:
   LGTM 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-21 Thread via GitHub


divijvaidya commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1237149655


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -291,7 +291,7 @@ public long skip(long toSkip) throws IOException {
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
 long bytesSkipped = (avail < remaining) ? avail : remaining;
-pos += bytesSkipped;
+pos += (int) bytesSkipped;

Review Comment:
   I am on it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15075) MM2 internal checkpoints topic should support multiple partitions

2023-06-21 Thread Elkhan Eminov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Eminov reassigned KAFKA-15075:
-

Assignee: Elkhan Eminov

> MM2 internal checkpoints topic should support multiple partitions
> -
>
> Key: KAFKA-15075
> URL: https://issues.apache.org/jira/browse/KAFKA-15075
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Elkhan Eminov
>Priority: Major
>
> Currently, the internal checkpoints topic of MM2 uses a single partition.
> This is an unnecessary limitation, and instead, it should support more 
> partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] TaklaGerguis opened a new pull request, #13897: KAFKA-14133: Migrate StateDirectory mock in TaskManagerTest to Mockito

2023-06-21 Thread via GitHub


TaklaGerguis opened a new pull request, #13897:
URL: https://github.com/apache/kafka/pull/13897

   This pull requests migrates the StateDirectory mock in TaskManagerTest from 
EasyMock to Mockito.
   The change is restricted to a single mock to minimise the scope and make it 
easier for review.
   
   The reasoning as to why we would like to migrate a single mock rather than 
all mocks in the file at the same time has been discussed in 
https://github.com/apache/kafka/pull/12607#issuecomment-1500829973
   
   It takes the same approach as in:
   https://github.com/apache/kafka/pull/13529
   https://github.com/apache/kafka/pull/13621
   https://github.com/apache/kafka/pull/13681
   https://github.com/apache/kafka/pull/13711
   et al.
   
   I had to change the names of some of the test methods because of a problem 
similar to the one discussed in 
https://github.com/apache/kafka/pull/13711#issuecomment-1587103195


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] TaklaGerguis commented on a diff in pull request #13897: KAFKA-14133: Migrate StateDirectory mock in TaskManagerTest to Mockito

2023-06-21 Thread via GitHub


TaklaGerguis commented on code in PR #13897:
URL: https://github.com/apache/kafka/pull/13897#discussion_r1237176915


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3474,14 +3453,13 @@ public void 
shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throw
 final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false);
 
 makeTaskFolders(taskId00.toString(), task01.toString());
-expectLockObtainedFor(taskId00, taskId01);

Review Comment:
   This was reported by Mockito as an unnecessary stubbing. I checked the code 
and it appears we never call this as part of the test. Let me know in case I am 
wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13721: KAFKA-14782: Implementation Details Different from Documentation (del…

2023-06-21 Thread via GitHub


clolov commented on PR #13721:
URL: https://github.com/apache/kafka/pull/13721#issuecomment-1601039637

   Okay, for the sake of moving this forward which one do you prefer @jolshan - 
I circle back to update the KIP or we accept this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13847: KAFKA-15082: The log retention policy doesn't take effect after altering log dir

2023-06-21 Thread via GitHub


clolov commented on code in PR #13847:
URL: https://github.com/apache/kafka/pull/13847#discussion_r1237187764


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1808,7 +1809,10 @@ class ReplicaManager(val config: KafkaConfig,
 
   // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
   // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  replicaAlterLogDirsManager.getFetcher(topicPartition) match {

Review Comment:
   Okay, this makes sense to me! Thank you! Could you just add/modify a unit 
test for both of these additions and I think this will be ready to be merged?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-21 Thread via GitHub


C0urante commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1237173042


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##
@@ -1819,16 +1825,25 @@ public void 
testGetSinkConnectorOffsetsAdminClientAsynchronousError() {
 verifyKafkaClusterId();
 }
 
-@SuppressWarnings("unchecked")
 private void mockAdminListConsumerGroupOffsets(Admin admin, 
Map consumerGroupOffsets, Exception e) {
+mockAdminListConsumerGroupOffsets(admin, consumerGroupOffsets, e, 
null, 0);
+}
+
+private void mockAdminListConsumerGroupOffsets(Admin admin, 
Map consumerGroupOffsets, Exception e, Time 
time, long delayMs) {
 ListConsumerGroupOffsetsResult result = 
mock(ListConsumerGroupOffsetsResult.class);
 when(admin.listConsumerGroupOffsets(anyString(), 
any(ListConsumerGroupOffsetsOptions.class))).thenReturn(result);
-KafkaFuture> adminFuture = 
mock(KafkaFuture.class);
-when(result.partitionsToOffsetAndMetadata()).thenReturn(adminFuture);
-when(adminFuture.whenComplete(any())).thenAnswer(invocation -> {
-((KafkaFuture.BiConsumer, 
Throwable>) invocation.getArgument(0))
-.accept(consumerGroupOffsets, e);
-return null;
+KafkaFutureImpl> adminFuture = 
new KafkaFutureImpl<>();
+if (e != null) {
+adminFuture.completeExceptionally(e);
+} else {
+adminFuture.complete(consumerGroupOffsets);
+}
+when(result.partitionsToOffsetAndMetadata()).thenAnswer(invocation -> {
+if (time == null) {
+return adminFuture;
+}
+time.sleep(delayMs);
+return adminFuture;

Review Comment:
   Nit: can simplify this
   ```suggestion
   if (time != null)
   time.sleep(delayMs);
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##
@@ -574,6 +574,274 @@ public void 
alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect,
 "Source connector offsets should reflect the expected number 
of records produced");
 }
 
+@Test
+public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws 
Exception {
+// Create a source connector and stop it
+connect.configureConnector(CONNECTOR_NAME, 
baseSourceConnectorConfigs());
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+"Connector tasks did not start in time.");
+connect.stopConnector(CONNECTOR_NAME);
+connect.assertions().assertConnectorIsStopped(
+CONNECTOR_NAME,
+"Connector did not stop in time"
+);
+String url = 
connect.endpointForResource(String.format("connectors/%s/offsets", 
CONNECTOR_NAME));
+
+String content = "[]";
+try (Response response = connect.requestPatch(url, content)) {
+assertEquals(500, response.getStatus());
+assertThat(response.getEntity().toString(), containsString("Cannot 
deserialize value"));
+}
+
+content = "{}";
+try (Response response = connect.requestPatch(url, content)) {
+assertEquals(400, response.getStatus());
+assertThat(response.getEntity().toString(), 
containsString("Partitions / offsets need to be provided for an alter offsets 
request"));
+}
+
+content = "{\"key\": []}";
+try (Response response = connect.requestPatch(url, content)) {
+assertEquals(500, response.getStatus());
+assertThat(response.getEntity().toString(), 
containsString("Unrecognized field"));
+}
+
+content = "{\"offsets\": []}";
+try (Response response = connect.requestPatch(url, content)) {
+assertEquals(400, response.getStatus());
+assertThat(response.getEntity().toString(), 
containsString("Partitions / offsets need to be provided for an alter offsets 
request"));
+}
+
+content = "{\"offsets\": {}}";
+try (Response response = connect.requestPatch(url, content)) {
+assertEquals(500, response.getStatus());
+assertThat(response.getEntity().toString(), containsString("Cannot 
deserialize value"));
+}
+
+content = "{\"offsets\": [123]}";
+try (Response response = connect.requestPatch(url, content)) {
+assertEquals(500, response.getStatus());
+assertThat(response.getEntity().toString(), containsString("Cannot 
construct instance"));
+}
+
+content = "{\"offsets\": [{\"key\": \"val\"}]}";
+try (Response response = connect.requestPatch(url, content)) {
+assertEquals(500, response.getStatus());
+assertThat(response.getEntity().toString(), 
containsString("Unrecognized fiel

[jira] [Commented] (KAFKA-15106) AbstractStickyAssignor may stuck in 3.5

2023-06-21 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735790#comment-17735790
 ] 

Kirk True commented on KAFKA-15106:
---

[~flashmouse] Thank you for the test case. I was able to reproduce the hanging 
behavior. I would have assumed that the {{@Timeout}} would have stopped the 
test after 90 seconds, but it didn't appear to when I ran it 🤔

I'm not familiar with this area of the code, so I'm not sure if the stated 
values make for a valid test or not 🤷‍♂️

Were you planning to provide a patch to fix the issue?

Thanks!

> AbstractStickyAssignor may stuck in 3.5
> ---
>
> Key: KAFKA-15106
> URL: https://issues.apache.org/jira/browse/KAFKA-15106
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.0
>Reporter: li xiangyuan
>Priority: Major
>
> this could reproduce in ut easy,
> just int 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest#testLargeAssignmentAndGroupWithNonEqualSubscription,
> plz set 
> partitionCount=200, 
> consumerCount=20,  you can see 
> isBalanced will return false forever.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on a diff in pull request #13859: KAFKA-15093: Add 3.4 and 3.5 to core upgrade and compatibility tests

2023-06-21 Thread via GitHub


clolov commented on code in PR #13859:
URL: https://github.com/apache/kafka/pull/13859#discussion_r1237243060


##
tests/kafkatest/tests/core/upgrade_test.py:
##
@@ -94,6 +94,12 @@ def perform_upgrade(self, from_kafka_version, 
to_message_format_version=None):
 self.wait_until_rejoin()
 
 @cluster(num_nodes=6)
+@parametrize(from_kafka_version=str(LATEST_3_5), 
to_message_format_version=None, compression_types=["none"])
+@parametrize(from_kafka_version=str(LATEST_3_5), 
to_message_format_version=None, compression_types=["lz4"])
+@parametrize(from_kafka_version=str(LATEST_3_5), 
to_message_format_version=None, compression_types=["snappy"])

Review Comment:
   A general question because I have been going into this part of the codebase 
- is there a reason why the other two compression types are not tested (i.e. 
gzip, zstd)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm opened a new pull request, #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

2023-06-21 Thread via GitHub


lianetm opened a new pull request, #13898:
URL: https://github.com/apache/kafka/pull/13898

   This is a follow up on the initial OffsetFetcher refactoring to extract 
reusable logic, needed for the new consumer implementation (initial refactoring 
merged with [PR-13815](https://github.com/apache/kafka/pull/13815).
   
   Similar to the initial refactoring, this PR brings no changes to the 
existing logic, just extracting functions or pieces of logic.
   
   There were no individual tests for the extracted functions, so no tests were 
migrated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13859: KAFKA-15093: Add 3.4 and 3.5 to core upgrade and compatibility tests

2023-06-21 Thread via GitHub


clolov commented on PR #13859:
URL: https://github.com/apache/kafka/pull/13859#issuecomment-1601121985

   I believe this 
(https://github.com/apache/kafka/blob/trunk/tests/kafkatest/version.py#L246-L248)
 should be changed to
   
   ```
   ...
   V_3_4_1 = KafkaVersion("3.4.1")
   LATEST_3_4 = V_3_4_1
   ```
   as part of this pull request, no?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-21 Thread via GitHub


divijvaidya commented on PR #13850:
URL: https://github.com/apache/kafka/pull/13850#issuecomment-1601136417

   Unrelated test failures:
   ```
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationSSLTest/Build___JDK_17_and_Scala_2_13___testSyncTopicConfigs__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_17_and_Scala_2_13___testBalancePartitionLeaders__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.LagFetchIntegrationTest.shouldFetchLagsDuringRestoration()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.streams.integration/LagFetchIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldFetchLagsDuringRestoration__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
   [Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/kafka.zk/ZkMigrationIntegrationTest/Build___JDK_11_and_Scala_2_131__Type_ZK__Name_testNewAndChangedTopicsInDualWrite__MetadataVersion_3_4_IV0__Security_PLAINTEXT/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testSyncTopicConfigs__/)
   [Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/kafka.zk/ZkMigrationIntegrationTest/Build___JDK_8_and_Scala_2_121__Type_ZK__Name_testNewAndChangedTopicsInDualWrite__MetadataVersion_3_4_IV0__Security_PLAINTEXT/)
   [Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/kafka.zk/ZkMigrationIntegrationTest/Build___JDK_8_and_Scala_2_121__Type_ZK__Name_testNewAndChangedTopicsInDualWrite__MetadataVersion_3_4_IV0__Security_PLAINTEXT_2/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-06-21 Thread via GitHub


pprovenzano commented on PR #13374:
URL: https://github.com/apache/kafka/pull/13374#issuecomment-1601140928

   The code is correct and the example should be updated to 'name=' .
   Sorry for the error in the documentation.
   --Proven
   
   On Wed, Jun 21, 2023 at 4:38 AM Mike Lothian ***@***.***>
   wrote:
   
   > The examples say:
   >
   >   --add-scram ADD_SCRAM, -S ADD_SCRAM
   >  A SCRAM_CREDENTIAL to add to the 
__cluster_metadata log e.g.
   >  'SCRAM-SHA-256=[user=alice,password=alice-secret]'
   >  
'SCRAM-SHA-512=[user=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]'
   >
   > But the code errors unless name= is passed, should user be name in the
   > example or is the check wrong
   > 
https://github.com/apache/kafka/blame/49c1697ab08189c9707d04a6078aa9d5b69ed3aa/core/src/main/scala/kafka/tools/StorageTool.scala#L174
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > You are receiving this because you authored the thread.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-21 Thread via GitHub


divijvaidya merged PR #13850:
URL: https://github.com/apache/kafka/pull/13850


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13447: MINOR: Change ordering of checks to prevent log spam on metadata updates

2023-06-21 Thread via GitHub


jolshan commented on code in PR #13447:
URL: https://github.com/apache/kafka/pull/13447#discussion_r1237297380


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -399,8 +399,13 @@ private Optional 
updateLatestMetadata(
 // Between the time that a topic is deleted and re-created, 
the client may lose track of the
 // corresponding topicId (i.e. `oldTopicId` will be null). In 
this case, when we discover the new
 // topicId, we allow the corresponding leader epoch to 
override the last seen value.
-log.info("Resetting the last seen epoch of partition {} to {} 
since the associated topicId changed from {} to {}",
- tp, newEpoch, oldTopicId, topicId);
+if (oldTopicId != null) {
+log.info("Resetting the last seen epoch of partition {} to 
{} since the associated topicId changed from {} to {}",
+tp, newEpoch, oldTopicId, topicId);
+} else {
+log.debug("Resetting the last seen epoch of partition {} 
to {} since the associated topicId was undefined but is now set to {}",

Review Comment:
   Hmmm. This would not be in the 3.4 release right? Let's make sure it's not 
3.5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13447: MINOR: Change ordering of checks to prevent log spam on metadata updates

2023-06-21 Thread via GitHub


jolshan commented on code in PR #13447:
URL: https://github.com/apache/kafka/pull/13447#discussion_r1237298897


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -399,8 +399,13 @@ private Optional 
updateLatestMetadata(
 // Between the time that a topic is deleted and re-created, 
the client may lose track of the
 // corresponding topicId (i.e. `oldTopicId` will be null). In 
this case, when we discover the new
 // topicId, we allow the corresponding leader epoch to 
override the last seen value.
-log.info("Resetting the last seen epoch of partition {} to {} 
since the associated topicId changed from {} to {}",
- tp, newEpoch, oldTopicId, topicId);
+if (oldTopicId != null) {
+log.info("Resetting the last seen epoch of partition {} to 
{} since the associated topicId changed from {} to {}",
+tp, newEpoch, oldTopicId, topicId);
+} else {
+log.debug("Resetting the last seen epoch of partition {} 
to {} since the associated topicId was undefined but is now set to {}",

Review Comment:
   As for seeing it every 5 minutes -- that's a bit strange. Once the client 
receives the topic ID in the cache, it should stay. Are you creating new 
topics, restarting the clients, or moving partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 opened a new pull request, #13899: KAFKA-3821: Allowing source offsets to be updated without sending SourceRecords

2023-06-21 Thread via GitHub


vamossagar12 opened a new pull request, #13899:
URL: https://github.com/apache/kafka/pull/13899

   This PR is an implementation of 
[KIP-910](https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records)
 which allows updating source offsets for partitions without necessarily 
needing to send SourceRecords. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-21 Thread via GitHub


jolshan commented on PR #13812:
URL: https://github.com/apache/kafka/pull/13812#issuecomment-1601217809

   Lots of connect failures when trying to shut down the brokers, let's see if 
this new run is cleaner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13890: KAFKA-15109 Don't skip leader epoch bump while in migration mode

2023-06-21 Thread via GitHub


mumrah merged PR #13890:
URL: https://github.com/apache/kafka/pull/13890


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-21 Thread via GitHub


divijvaidya commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1237341025


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -291,7 +291,7 @@ public long skip(long toSkip) throws IOException {
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
 long bytesSkipped = (avail < remaining) ? avail : remaining;
-pos += bytesSkipped;
+pos += (int) bytesSkipped;

Review Comment:
   @ijuma casting to int here is safe. Can you please add the following comment 
here:
   ```
   // cast to int is fine here since the value of bytesSkipped is upper bound 
by value of avail
   // which is an int
   ```
   
   It is safe because as per the lines above it `long bytesSkipped = (avail < 
remaining) ? avail : remaining;`, bytesSkipped is minimum of avail or remaining 
which means it's max value is available which we know is an int.
   
   Same for the other place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-21 Thread via GitHub


ijuma commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1237383663


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -291,7 +291,7 @@ public long skip(long toSkip) throws IOException {
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
 long bytesSkipped = (avail < remaining) ? avail : remaining;
-pos += bytesSkipped;
+pos += (int) bytesSkipped;

Review Comment:
   Hmm, in that case we can perhaps change `bytesSkipped` to be an `int`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13891: Breaks down SocketServer classes into ConnectionQuotaEntity.scala and ConnectionQuotas.scala

2023-06-21 Thread via GitHub


kirktrue commented on PR #13891:
URL: https://github.com/apache/kafka/pull/13891#issuecomment-1601334521

   Thanks for the PR @jaewie.
   
   Is there a Jira that tracks this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-21 Thread via GitHub


divijvaidya commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1237418125


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -291,7 +291,7 @@ public long skip(long toSkip) throws IOException {
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
 long bytesSkipped = (avail < remaining) ? avail : remaining;
-pos += bytesSkipped;
+pos += (int) bytesSkipped;

Review Comment:
   I am afraid that won't be possible.
   
   This is because there are two branches where `bytesSkipped` is updated.  In 
one branch when we call   `bytesSkipped = getInIfOpen().skip(remaining)`,  
`bytesSkipped` holds a long value since `getInIfOpen().skip(remaining)` returns 
a long. In the other branch, when we call `bytesSkipped = (avail < remaining) ? 
avail : remaining`, `bytesSkipped` holds an int since this min statement bounds 
it to max of avail, which is an int.
   
   I can attempt to do a bunch of refactoring if that will make things clearer 
but that shouldn't block this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13816: MINOR: Optimize runtime of MM2 integration tests by batching transactions

2023-06-21 Thread via GitHub


C0urante commented on PR #13816:
URL: https://github.com/apache/kafka/pull/13816#issuecomment-1601405761

   The test failures that occurred during CI are related to topic confic 
syncing and appear both unrelated and to be happening on trunk as well. 
Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13816: MINOR: Optimize runtime of MM2 integration tests by batching transactions

2023-06-21 Thread via GitHub


C0urante merged PR #13816:
URL: https://github.com/apache/kafka/pull/13816


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage

2023-06-21 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735863#comment-17735863
 ] 

Satish Duggana commented on KAFKA-7739:
---

[~jeqo]

Jira ticket is https://issues.apache.org/jira/browse/KAFKA-14915 and it is 
considered for 3.6.0 release. 

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.6.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison merged pull request #13896: MINOR: Fix generated client ids for Connect

2023-06-21 Thread via GitHub


mimaison merged PR #13896:
URL: https://github.com/apache/kafka/pull/13896


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12290: MINOR: Stop leaking threads in BlockingConnectorTest

2023-06-21 Thread via GitHub


C0urante commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1237612378


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -401,23 +430,29 @@ public Block(Map props) {
 
 public Block(String block) {
 this.block = block;
-synchronized (Block.class) {
-if (blockLatch != null) {
-blockLatch.countDown();
+if (block != null) {
+synchronized (Block.class) {
+resetAwaitBlockLatch();
+awaitBlockLatch = new CountDownLatch(1);
 }
-blockLatch = new CountDownLatch(1);
 }
 }
 
 public void maybeBlockOn(String block) {
 if (block.equals(this.block)) {
 log.info("Will block on {}", block);
-blockLatch.countDown();
+CountDownLatch blockLatch;
+synchronized (Block.class) {
+awaitBlockLatch.countDown();
+blockLatch = newBlockLatch();
+}
 while (true) {
 try {
-Thread.sleep(Long.MAX_VALUE);
+blockLatch.await();
+log.debug("Instructed to stop blocking; will resume 
normal execution");
+return;
 } catch (InterruptedException e) {
-// No-op. Just keep blocking.
+log.debug("Interrupted while blocking; will continue 
blocking until instructed to stop");
 }
 }

Review Comment:
   Sort of--this prevents the `Worker` class's `executor` field from shutting 
down gracefully (i.e., when we invoke `awaitTermination` on it in 
`ThreadUtils::shutdownExecutorServiceQuietly`), but it doesn't prevent the 
Connect worker from shutting down, since we put a bound on how long we wait for 
the executor to shut down before moving on.
   
   This is why the tests on trunk (which also have this kind of `while (true)` 
loop to simulate connector/task blocks) don't hang.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12290: MINOR: Stop leaking threads in BlockingConnectorTest

2023-06-21 Thread via GitHub


C0urante commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1237620493


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -350,13 +353,16 @@ private void assertRequestTimesOut(String 
requestDescription, ThrowingRunnable r
 }
 
 private static class Block {
-private static CountDownLatch blockLatch;
+// All latches that blocking connectors/tasks are or will be waiting 
on during a test case
+private static final Set BLOCK_LATCHES = new 
HashSet<>();
+// The latch that can be used to wait for a connector/task to reach 
the most-recently-registered blocking point
+private static CountDownLatch awaitBlockLatch;
 
 private final String block;
 
 public static final String BLOCK_CONFIG = "block";
 
-private static ConfigDef config() {
+public static ConfigDef config() {

Review Comment:
   It signals that the method is used outside of the class in which it's 
defined (even though in this case, as you noted, it's not required since the 
outer class can still access `private` methods), and makes it easier to pull 
the inner `Block` class out into a separate top-level class if necessary.
   
   I was hoping this would help with readability, but if it's more confusing 
than useful, we should definitely revert. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15111) Correction kafka examples

2023-06-21 Thread Dmitry (Jira)
Dmitry created KAFKA-15111:
--

 Summary: Correction kafka examples
 Key: KAFKA-15111
 URL: https://issues.apache.org/jira/browse/KAFKA-15111
 Project: Kafka
  Issue Type: Task
Reporter: Dmitry
 Fix For: 3.6.0


Minor сorrection in javadoc for KafkaConsumerProducerDemo class and remove 
unused TOPIC field from KafkaProperties.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15111) Correction kafka examples

2023-06-21 Thread Dmitry (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry updated KAFKA-15111:
---
Description: Need change TOPIC_NAME = topic1 in  KafkaConsumerProducerDemo 
class and remove unused TOPIC field from KafkaProperties.  (was: Minor 
сorrection in javadoc for KafkaConsumerProducerDemo class and remove unused 
TOPIC field from KafkaProperties.)

> Correction kafka examples
> -
>
> Key: KAFKA-15111
> URL: https://issues.apache.org/jira/browse/KAFKA-15111
> Project: Kafka
>  Issue Type: Task
>Reporter: Dmitry
>Priority: Minor
> Fix For: 3.6.0
>
>
> Need change TOPIC_NAME = topic1 in  KafkaConsumerProducerDemo class and 
> remove unused TOPIC field from KafkaProperties.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tinaselenge commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-21 Thread via GitHub


tinaselenge commented on code in PR #13760:
URL: https://github.com/apache/kafka/pull/13760#discussion_r1237642066


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2359,31 +2334,22 @@ public void testDeleteRecords() throws Exception {
 assertTrue(e0.getCause() instanceof OffsetOutOfRangeException);
 }
 
-// "leader not available" failure on metadata request for 
partition 2
+// not authorized to delete records for partition 2
 KafkaFuture myTopicPartition2Result = 
values.get(myTopicPartition2);
 try {
 myTopicPartition2Result.get();
 fail("get() should throw ExecutionException");
 } catch (ExecutionException e1) {
-assertTrue(e1.getCause() instanceof 
LeaderNotAvailableException);
+assertTrue(e1.getCause() instanceof 
TopicAuthorizationException);
 }

Review Comment:
   We are actually catching ExecutionException and checking the cause of the 
exception. This would not work because it's not throwing 
TopicAuthorizationException, but it is the supplied cause for the 
ExecutionException. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-06-21 Thread via GitHub


C0urante commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1237643809


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -273,6 +288,30 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, 
String topic, int numMes
 }
 }
 
+private void awaitMirrorMakerStart(final SourceAndTarget sourceAndTarget) 
throws InterruptedException {
+waitForCondition(() -> {
+try {
+return mirrorMakers.values().stream().allMatch(
+mm -> CONNECTOR_CLASSES.stream().allMatch(
+connectorClazz -> 
isConnectorRunningForMirrorMaker(connectorClazz, mm, sourceAndTarget)));
+} catch (Exception ex) {
+log.error("Something unexpected occurred. Unable to check for 
startup status for mirror maker for {}", sourceAndTarget, ex);
+throw ex;
+}
+}, MM_START_UP_TIMEOUT_MS, "MirrorMaker instances did not transition 
to running in time");
+}
+
+private  void awaitConnectorTasksStart(final 
Class clazz, final String source, String target) throws InterruptedException 
{
+waitForCondition(() -> {
+try {
+return mirrorMakers.values().stream().allMatch(mm -> 
isTaskRunningForMirrorMakerConnector(clazz, mm, source, target));

Review Comment:
   Same thought RE only performing assertions with a single `MirrorMaker` 
instance instead of all of them.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -288,4 +327,28 @@ private void awaitTopicContent(EmbeddedKafkaCluster 
cluster, String clusterName,
 }
 }
 
+/**
+ * Validates that the underlying connector are running for the given 
MirrorMaker.
+ */
+private boolean isConnectorRunningForMirrorMaker(final Class 
connectorClazz, final MirrorMaker mm, final SourceAndTarget sourceAndTarget) {
+final String connName = connectorClazz.getSimpleName();
+final ConnectorStateInfo connectorStatus = 
mm.connectorStatus(sourceAndTarget, connName);
+return connectorStatus != null
+// verify that connector state is set to running
+&& 
connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString());
+}
+
+/**
+ * Validates that the tasks are associated with the connector and they are 
running for the given MirrorMaker.
+ */
+private  boolean 
isTaskRunningForMirrorMakerConnector(final Class connectorClazz, final 
MirrorMaker mm, final String source, final String target) {
+final SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
+final String connName = connectorClazz.getSimpleName();
+final ConnectorStateInfo connectorStatus = 
mm.connectorStatus(sourceAndTarget, connName);
+return isConnectorRunningForMirrorMaker(connectorClazz, mm, 
sourceAndTarget)
+// verify that at least one task exists
+&& !connectorStatus.tasks().isEmpty()
+// verify that tasks are set to running
+&& connectorStatus.tasks().stream().allMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));

Review Comment:
   Same thought RE throwing a `NoRetryException` if a connector (or, here, 
task) is `FAILED`.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -94,6 +100,11 @@ private MirrorMaker startMirrorMaker(String name, 
Map mmProps) {
 
 result.start();
 
+// wait for connectors to start
+String[] clusterNames = mmProps.get("clusters").split(",");
+SourceAndTarget sourceAndTarget = new 
SourceAndTarget(clusterNames[0].trim(), clusterNames[1].trim());
+awaitMirrorMakerStart(sourceAndTarget);

Review Comment:
   This assumes we define a one-way flow with two clusters, from the first 
cluster to the second.
   
   This assumption may not hold in future updates and people may think we did 
this intentionally (instead of out of convenience).
   
   I think it'd be better to move the calls to `awaitMirrorMakerStart` into the 
test cases themselves, since that's where the clusters for each run are 
currently defined.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -288,4 +327,28 @@ private void awaitTopicContent(EmbeddedKafkaCluster 
cluster, String clusterName,
 }
 }
 
+/**
+ * Validates that the underlying connector are running for the given 
MirrorMaker.
+ */
+private boolean isConnectorRunningForMirrorMaker(final Class 
connectorClazz, final MirrorMaker mm, final SourceAndTarget sourceAndTarget) {
+final String connName = connectorClazz.getSimpleName();
+ 

[GitHub] [kafka] tinaselenge commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-21 Thread via GitHub


tinaselenge commented on code in PR #13760:
URL: https://github.com/apache/kafka/pull/13760#discussion_r1237673297


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.admin.DeletedRecords;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.DeleteRecordsRequestData;
+import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class DeleteRecordsHandler extends Batched {
+
+private final Map recordsToDelete;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public DeleteRecordsHandler(
+Map recordsToDelete,
+LogContext logContext
+) {
+this.recordsToDelete = recordsToDelete;
+this.log = logContext.logger(DeleteRecordsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "deleteRecords";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+public static SimpleAdminApiFuture 
newFuture(
+Collection topicPartitions
+) {
+return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+}
+
+@Override
+public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map 
deletionsForTopic = new HashMap<>();
+for (Map.Entry entry: 
recordsToDelete.entrySet()) {
+TopicPartition topicPartition = entry.getKey();
+DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = 
deletionsForTopic.computeIfAbsent(
+topicPartition.topic(),
+key -> new 
DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic())
+);
+deleteRecords.partitions().add(new 
DeleteRecordsRequestData.DeleteRecordsPartition()
+.setPartitionIndex(topicPartition.partition())
+.setOffset(entry.getValue().beforeOffset()));
+}
+
+DeleteRecordsRequestData data = new DeleteRecordsRequestData()
+.setTopics(new ArrayList<>(deletionsForTopic.values()));

Review Comment:
   I could be wrong but I think the timeout specified by the user gets passed 
to AdminApiDriver (via invokeDriver()) and it handles the timeout for the 
requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15111) Correction kafka examples

2023-06-21 Thread Dmitry (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry updated KAFKA-15111:
---
Description: Need set TOPIC_NAME = topic1 in  KafkaConsumerProducerDemo 
class and remove unused TOPIC field from KafkaProperties.  (was: Need change 
TOPIC_NAME = topic1 in  KafkaConsumerProducerDemo class and remove unused TOPIC 
field from KafkaProperties.)

> Correction kafka examples
> -
>
> Key: KAFKA-15111
> URL: https://issues.apache.org/jira/browse/KAFKA-15111
> Project: Kafka
>  Issue Type: Task
>Reporter: Dmitry
>Priority: Minor
> Fix For: 3.6.0
>
>
> Need set TOPIC_NAME = topic1 in  KafkaConsumerProducerDemo class and remove 
> unused TOPIC field from KafkaProperties.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15111) Correction kafka examples

2023-06-21 Thread Dmitry (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry updated KAFKA-15111:
---
Description: Need set TOPIC_NAME = topic1 in KafkaConsumerProducerDemo 
class and remove unused TOPIC field from KafkaProperties.  (was: Need set 
TOPIC_NAME = topic1 in  KafkaConsumerProducerDemo class and remove unused TOPIC 
field from KafkaProperties.)

> Correction kafka examples
> -
>
> Key: KAFKA-15111
> URL: https://issues.apache.org/jira/browse/KAFKA-15111
> Project: Kafka
>  Issue Type: Task
>Reporter: Dmitry
>Priority: Minor
> Fix For: 3.6.0
>
>
> Need set TOPIC_NAME = topic1 in KafkaConsumerProducerDemo class and remove 
> unused TOPIC field from KafkaProperties.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-21 Thread via GitHub


jeffkbkim commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1237755278


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializerThe deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](
+  replicaManager: ReplicaManager,
+  deserializer: Deserializer[T],
+  loadBufferSize: Int
+) extends CoordinatorLoader[T] with Logging {
+  private val isRunning = new AtomicBoolean(true)
+  private val scheduler = new KafkaScheduler(1)
+  scheduler.startup()
+
+  /**
+   * Loads the coordinator by reading all the records from the TopicPartition
+   * and applying them to the Replayable object.
+   *
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   */
+  override def load(
+tp: TopicPartition,
+coordinator: CoordinatorPlayback[T]
+): CompletableFuture[Void] = {
+val future = new CompletableFuture[Void]()
+val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
+  () => doLoad(tp, coordinator, future))

Review Comment:
   before this is executed by the runtime, what happens if requests for a non 
loaded partition come in? how do we check whether a partition is available or 
not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15112) Allow ignoring advertised listeners

2023-06-21 Thread Marcin Wisnicki (Jira)
Marcin Wisnicki created KAFKA-15112:
---

 Summary: Allow ignoring advertised listeners
 Key: KAFKA-15112
 URL: https://issues.apache.org/jira/browse/KAFKA-15112
 Project: Kafka
  Issue Type: Wish
  Components: clients
Reporter: Marcin Wisnicki


Add property to ignore advertised listeners from broker and connect directly to 
bootstrap server. And/or property to fallback to bootstrap server if advertised 
host name is wrong.

This would greatly simplify local usage of dockerized kafka for 
testing/poc/development.

Currently one has to pass ADVERTISED_HOST_NAME with current IP when starting 
container and that's actually not always easy or possible (for example if I 
start docker compose with a GUI).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15112) Allow ignoring advertised listeners

2023-06-21 Thread Mehari Beyene (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735930#comment-17735930
 ] 

Mehari Beyene commented on KAFKA-15112:
---

+1. This will also have a use case on the broker side.

If dynamically set advertised listeners are present, the Kafka server will use 
them instead of the default listeners defined in the server.properties.

However, if end users unintentionally set incorrect advertised listeners for 
replication, it can result in the broker being bootstrapped with invalid 
listeners. This situation renders the cluster unusable and even causes 
replication to fail if the user has overridden the replication listeners as 
well.

Having a fallback mechanism to utilize the default listeners defined in the 
server.properties file would make the broker more resilient.

> Allow ignoring advertised listeners
> ---
>
> Key: KAFKA-15112
> URL: https://issues.apache.org/jira/browse/KAFKA-15112
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Reporter: Marcin Wisnicki
>Priority: Major
>
> Add property to ignore advertised listeners from broker and connect directly 
> to bootstrap server. And/or property to fallback to bootstrap server if 
> advertised host name is wrong.
> This would greatly simplify local usage of dockerized kafka for 
> testing/poc/development.
> Currently one has to pass ADVERTISED_HOST_NAME with current IP when starting 
> container and that's actually not always easy or possible (for example if I 
> start docker compose with a GUI).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0

2023-06-21 Thread Bo Gao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735933#comment-17735933
 ] 

Bo Gao commented on KAFKA-15053:


Hi [~ChrisEgerton] , quick question for the fix of this ticket: can we include 
it into the 3.3.x minor release as well or we have already stopped doing minor 
release for 3.3.x?

> Regression for security.protocol validation starting from 3.3.0
> ---
>
> Key: KAFKA-15053
> URL: https://issues.apache.org/jira/browse/KAFKA-15053
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Bo Gao
>Assignee: Bo Gao
>Priority: Major
>
> [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue 
> introduced validations on multiple configs. As a consequence, config 
> {{security.protocol}} now only allows upper case values such as PLAINTEXT, 
> SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like 
> sasl_ssl, ssl are also supported, there's even a case insensitive logic 
> inside 
> [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
>  to handle the lower case values.
> I think we should treat this as a regression bug since we don't support lower 
> case values anymore since 3.3.0. For versions later than 3.3.0, we are 
> getting error like this when using lower case value sasl_ssl
> {{Invalid value sasl_ssl for configuration security.protocol: String must be 
> one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >