Re: [PR] MINOR: Add 3.5.2 and 3.6.1 to system tests [kafka]

2024-01-12 Thread via GitHub


showuon commented on PR #14932:
URL: https://github.com/apache/kafka/pull/14932#issuecomment-1890349665

   I've backported to 3.7 branch, and created a PR: 
https://github.com/apache/kafka/pull/15187 to add v3.5.2 into 3.6 branch. 
Please help review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: add v3.5.2 in 3.6 branch [kafka]

2024-01-12 Thread via GitHub


showuon opened a new pull request, #15187:
URL: https://github.com/apache/kafka/pull/15187

   add v3.5.2 in 3.6 branch
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2024-01-12 Thread via GitHub


iit2009060 commented on PR #15060:
URL: https://github.com/apache/kafka/pull/15060#issuecomment-1890348438

   @satishd  @kamalcph  @divijvaidya  I am able to reproduce the issue which 
@satishd  mentioned , I need to introduced a  delay to the movement of segments 
to remote through a hacky code. 
   
   1. Created topic test 10 
   2. Set segment bytes 100 so that each segment contain only one offset.
   3. Set clean up policy  compact 
   4. Produce some messages (1:1,1:2,1:3,1:4,1:5,1:6,1:7)
   5. Log compaction is in progress 
   6.  Delete log compaction policy from the topic 
   8. Enable remote storage enable = true
   
   I introduced a code which copied remote segments whose base offset  <= 1 in 
the RemoteLogManager so that we can have a situation where in remote segments  
data is not available , but in local/active segment data is available. 
   https://github.com/apache/kafka/assets/59436466/cc018078-801e-4850-9f1f-917ae6326202;>
   
   9. In the local log segments 0 and 1 has been removed and moved to a remote 
log storage where the number of bytes is zero as data is removed because of log 
compaction. Observe local segments contains data only for offset >=2. 
   **Local log segments view**
   https://github.com/apache/kafka/assets/59436466/6062b6f4-33ba-43dd-8495-2fb9bacff69e;>
   **Remote log segments view** 
   https://github.com/apache/kafka/assets/59436466/e91e0f5b-1dc4-4569-98e1-a32eebdbaf5c;>
   Remote log contain only two segments 0 and 1 , both are empty because of log 
compaction. 
   10. Execute consumer service 
   `bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic 
test10  --offset 0 --partition 0 --property print.offset=true --property 
print.key=true --property print.value=true --consumer-property 
auto.offset.reset=earliest
   `
   This command never proceeds as RemoteLogManager return empty records and we 
do not have a mechanism yet to handle cases when RemoteLog
   https://github.com/apache/kafka/assets/59436466/4051d50f-906e-4ee2-8475-a855f51909aa;>
   Manager return empty records. 
   
   https://github.com/apache/kafka/assets/59436466/424015c0-c4d3-4b03-ab3c-2940c02a9ad9;>
   https://github.com/apache/kafka/assets/59436466/334d807a-3f4c-4b00-8990-fde48912b14e;>
   
   @satishd  @divijvaidya  @kamalcph  Let me know if you have a suggestion to 
fix this use case .
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add isRemoteLogEnabled parameter to the Log Loader Javadoc [kafka]

2024-01-12 Thread via GitHub


showuon merged PR #15179:
URL: https://github.com/apache/kafka/pull/15179


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add isRemoteLogEnabled parameter to the Log Loader Javadoc [kafka]

2024-01-12 Thread via GitHub


showuon commented on PR #15179:
URL: https://github.com/apache/kafka/pull/15179#issuecomment-1890343953

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2024-01-12 Thread via GitHub


kamalcph commented on code in PR #14766:
URL: https://github.com/apache/kafka/pull/14766#discussion_r1451300245


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,25 +1425,70 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
-def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
-
-  // Check not to delete segments which are not yet copied to tiered 
storage if remote log is enabled.
-  (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage)) &&
-// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
-// offset can never exceed it.
-highWatermark >= upperBoundOffset &&
-predicate(segment, nextSegmentOpt)
-}
 lock synchronized {
-  val deletable = localLog.deletableSegments(shouldDelete)
+  val deletable = deletableSegments(predicate)
   if (deletable.nonEmpty)
 deleteSegments(deletable, reason)
   else
 0
 }
   }
 
+  /**
+   * Find segments starting from the oldest until the user-supplied predicate 
is false.
+   * A final segment that is empty will never be returned.
+   *
+   * @param predicate A function that takes in a candidate log segment, the 
next higher segment
+   *  (if there is one). It returns true iff the segment is 
deletable.
+   * @return the segments ready to be deleted
+   */
+  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
+def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = {
+  // Segments are eligible for deletion when:
+  //1. they are uploaded to the remote storage
+  if (remoteLogEnabled()) {
+upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage
+  } else {
+true
+  }
+}
+
+if (localLog.segments.isEmpty) {
+  Seq.empty
+} else {
+  val deletable = ArrayBuffer.empty[LogSegment]
+  val segmentsIterator = localLog.segments.values.iterator
+  var segmentOpt = nextOption(segmentsIterator)
+  var shouldRoll = false
+  while (segmentOpt.isDefined) {
+val segment = segmentOpt.get
+val nextSegmentOpt = nextOption(segmentsIterator)
+val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
+val upperBoundOffset = if (nextSegmentOpt.nonEmpty) 
nextSegmentOpt.get.baseOffset() else logEndOffset
+// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
+// offset can never exceed it.
+val predicateResult = highWatermark >= upperBoundOffset && 
predicate(segment, nextSegmentOpt)
+
+// Roll the active segment when it breaches the configured retention 
policy. The rolled segment will be
+// eligible for deletion and gets removed in the next iteration.
+if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && 
segment.size > 0) {
+  shouldRoll = true

Review Comment:
   In the normal topic, we will consider all the segments (inc. active) for 
deletion due to breach by size/time. When the number of deletable segments 
equals to the number of segments that exists in the local disk, then we `roll` 
a new segment and delete all the eligible segments in the same iteration. 
   
   
[UnifiedLog#deleteSegments](https://sourcegraph.com/github.com/apache/kafka@21227bda61e75e3a8f1401ff94b27e9161cd3f1b/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L1505-L1511)
   
   With remote storage enabled on the topic, once the active segments gets 
rolled, we have to successfully upload it to the remote storage before 
considering it as eligible for deletion so we did this in two iterations:
   
   When the `kafka-log-retention` thread runs:
   
   Iteration-1: If the active segment breached the local-retention time/size, 
then it rolls the segment to passive.
   Iteration-2: If the rolled passive segment gets uploaded to remote storage, 
then it removes that segment from local disk.
   
   The remote-retention-cleaner thread will remove the uploaded segment from 
the remote storage when it breaches the complete retention-size/ retention-time.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptor [kafka]

2024-01-12 Thread via GitHub


Joker-5 commented on PR #14963:
URL: https://github.com/apache/kafka/pull/14963#issuecomment-1890340469

   Hey, @lucasbru, I saw [this PR](https://github.com/apache/kafka/pull/15000) 
is ready to merge, but I don't see the `Co-authord-by` tag in the commits log.
   I'm a bit worried that you forgot or maybe there's some reason for it. I 
would be extremely grateful if you would tell me about it if you have time.
   So sorry to trouble you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-12 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1451284748


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -1245,8 +1246,8 @@ private[log] class CleanedTransactionMetadata {
*
* @param abortedTransactions The new found aborted transactions to add
*/
-  def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = {
-this.abortedTransactions ++= abortedTransactions
+  def addAbortedTransactions(abortedTransactions: util.List[AbortedTxn]): Unit 
= {
+abortedTransactions.forEach(abortedTxn => this.abortedTransactions += 
abortedTxn)

Review Comment:
   Wanted to avoid scala conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-12 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1451284647


##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -3951,18 +3953,18 @@ class UnifiedLogTest {
 assertEquals(10, log.logSegments.size())
 
 {
-  val deletable = log.deletableSegments(
-(segment: LogSegment, _: Option[LogSegment]) => segment.baseOffset <= 
5)
-  val expected = log.nonActiveLogSegmentsFrom(0L).asScala.filter(segment 
=> segment.baseOffset <= 5).toList
-  assertEquals(6, expected.length)
-  assertEquals(expected, deletable.toList)
+  val deletable = new util.ArrayList(

Review Comment:
   This is strictly not needed as it returns `ArrayList` in this test's 
successful case scenario but the same method can return `EmptyList` instance 
also in other case. I wanted the test to fail with details about the elements 
instead of type mismatch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-12 Thread via GitHub


satishd commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1451284145


##
core/src/main/scala/kafka/log/LogLoader.scala:
##
@@ -489,16 +488,16 @@ class LogLoader(
*
* @param segmentsToDelete The log segments to schedule for deletion
*/
-  private def removeAndDeleteSegmentsAsync(segmentsToDelete: 
Iterable[LogSegment]): Unit = {
-if (segmentsToDelete.nonEmpty) {
+  private def removeAndDeleteSegmentsAsync(segmentsToDelete: 
java.util.Collection[LogSegment]): Unit = {
+if (!segmentsToDelete.isEmpty) {
   // Most callers hold an iterator into the `params.segments` collection 
and
   // `removeAndDeleteSegmentAsync` mutates it by removing the deleted 
segment. Therefore,
   // we should force materialization of the iterator here, so that results 
of the iteration
   // remain valid and deterministic. We should also pass only the 
materialized view of the
   // iterator to the logic that deletes the segments.
-  val toDelete = segmentsToDelete.toList
-  info(s"Deleting segments as part of log recovery: 
${toDelete.mkString(",")}")
-  toDelete.foreach { segment =>
+  val toDelete = new util.ArrayList[LogSegment](segmentsToDelete)
+  info(s"Deleting segments as part of log recovery: 
${LocalLog.mkString(toDelete.iterator(), ",")}")

Review Comment:
   Thanks for letting me know about `Utils.join`, I was not aware of it.



##
core/src/test/scala/unit/kafka/log/LocalLogTest.scala:
##
@@ -362,8 +363,8 @@ class LocalLogTest {
 }
 assertEquals(5, log.segments.numberOfSegments)
 assertNotEquals(10L, log.segments.activeSegment.baseOffset)
-val expected = log.segments.values.asScala.toVector
-val deleted = log.truncateFullyAndStartAt(10L)
+val expected = new util.ArrayList(log.segments.values)
+val deleted = new util.ArrayList(log.truncateFullyAndStartAt(10L))

Review Comment:
   They give different types like java `ConcurrentSkipListMap.Values` and 
`ArrayList`, and the equality check will fail. Wrapped around the same 
ArrayList instance for their equality checks. 
   Followed similar checks in LocalLogTest and UnifiedLogTest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-12 Thread via GitHub


kirktrue closed pull request #15186: KAFKA-16029: Fix "Unable to find 
FetchSessionHandler for node X" bug
URL: https://github.com/apache/kafka/pull/15186


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-12 Thread via GitHub


kirktrue closed pull request #15186: KAFKA-16029: Fix "Unable to find 
FetchSessionHandler for node X" bug
URL: https://github.com/apache/kafka/pull/15186


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14648: Moving bootstrap to NetworkClient Poll. [kafka]

2024-01-12 Thread via GitHub


github-actions[bot] commented on PR #14054:
URL: https://github.com/apache/kafka/pull/14054#issuecomment-1890281361

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] POC - DO NOT MERGE: Deprecation cleanup [kafka]

2024-01-12 Thread via GitHub


github-actions[bot] commented on PR #14551:
URL: https://github.com/apache/kafka/pull/14551#issuecomment-1890281139

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-12 Thread via GitHub


artemlivshits commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1451074358


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig,
* @param producerId  the producer id for the producer writing to the 
transaction
* @param producerEpoch   the epoch of the producer writing to the 
transaction
* @param baseSequencethe base sequence of the first record in the batch 
we are trying to append
-   * @param requestLocalcontainer for the stateful instances scoped to 
this request -- this must correspond to the
-   *thread calling this method
* @param callbackthe method to execute once the verification is 
either completed or returns an error
*
* When the verification returns, the callback will be supplied the error if 
it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the 
SENTINEL verification guard will be returned.
-   * This guard can not be used for verification and any appends that attenpt 
to use it will fail.
+   * This guard can not be used for verification and any appends that attempt 
to use it will fail.
*/
   def maybeStartTransactionVerificationForPartition(
 topicPartition: TopicPartition,
 transactionalId: String,
 producerId: Long,
 producerEpoch: Short,
 baseSequence: Int,
-requestLocal: RequestLocal,
-callback: (Errors, RequestLocal, VerificationGuard) => Unit
+callback: Either[Errors, VerificationGuard] => Unit
   ): Unit = {
-def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
-newRequestLocal: RequestLocal,
-verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
-  callback(
-preAppendErrors.getOrElse(topicPartition, Errors.NONE),
-newRequestLocal,
-verificationGuards.getOrElse(topicPartition, 
VerificationGuard.SENTINEL))
+def generalizedCallback(results: Map[TopicPartition, Either[Errors, 
VerificationGuard]]): Unit = {
+  callback(results.getOrElse(topicPartition, 
Right(VerificationGuard.SENTINEL)))

Review Comment:
   This logic is just a translation from the current implementation (so it's 
not introducing anything new), but is it expected that we don't get the results 
for the requested topicPartition?  Should we log a warning, so that we know 
that we're hitting some unexpected code path?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig,
* @param producerId  the producer id for the producer writing to the 
transaction
* @param producerEpoch   the epoch of the producer writing to the 
transaction
* @param baseSequencethe base sequence of the first record in the batch 
we are trying to append
-   * @param requestLocalcontainer for the stateful instances scoped to 
this request -- this must correspond to the
-   *thread calling this method
* @param callbackthe method to execute once the verification is 
either completed or returns an error
*
* When the verification returns, the callback will be supplied the error if 
it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the 
SENTINEL verification guard will be returned.
-   * This guard can not be used for verification and any appends that attenpt 
to use it will fail.
+   * This guard can not be used for verification and any appends that attempt 
to use it will fail.
*/
   def maybeStartTransactionVerificationForPartition(
 topicPartition: TopicPartition,
 transactionalId: String,
 producerId: Long,
 producerEpoch: Short,
 baseSequence: Int,
-requestLocal: RequestLocal,
-callback: (Errors, RequestLocal, VerificationGuard) => Unit
+callback: Either[Errors, VerificationGuard] => Unit
   ): Unit = {
-def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
-newRequestLocal: RequestLocal,
-verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
-  callback(
-preAppendErrors.getOrElse(topicPartition, Errors.NONE),
-newRequestLocal,
-verificationGuards.getOrElse(topicPartition, 
VerificationGuard.SENTINEL))
+def generalizedCallback(results: Map[TopicPartition, Either[Errors, 
VerificationGuard]]): Unit = {

Review Comment:
   I think we could do the translation from preAppendErrors, newRequestLocal, 
verificationGuards here, then we'd avoid propagating the changes all the way to 
replication layer.



##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -935,8 +935,9 @@ private[group] class GroupCoordinator(
   

Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-12 Thread via GitHub


jsancio commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1451075038


##
checkstyle/suppressions.xml:
##
@@ -315,7 +315,7 @@
 
 
+  
files="(PartitionRegistration|PartitionChangeBuilder|TopicDelta).java"/>

Review Comment:
   Yes. Let's use annotation so that it only applies to the methods that 
violates our coding style.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14957: Update-Description-String [kafka]

2024-01-12 Thread via GitHub


mjsax commented on code in PR #13909:
URL: https://github.com/apache/kafka/pull/13909#discussion_r1451069044


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1263,6 +1327,8 @@ protected String getConfigValue(ConfigKey key, String 
headerName) {
 return key.type.toString().toLowerCase(Locale.ROOT);
 case "Default":
 if (key.hasDefault()) {
+if (key.alternativeString != null)

Review Comment:
   Can we really add this here? `getConfigValue` might be used somewhere else, 
and it might be expect to get the actual config value back, but never the 
html-string.
   
   I think we should ensure that the html-string is only returned on the call 
path of `toHtml`?



##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1238,6 +1277,31 @@ public ConfigKey(String name, Type type, Object 
defaultValue, Validator validato
 this.displayName = displayName;
 this.recommender = recommender;
 this.internalConfig = internalConfig;
+this.overwrittenValue = null;
+}
+
+public ConfigKey(String name, Type type, Object defaultValue, 
Validator validator,

Review Comment:
   What I means was, to add parameter `String alternativeString` to the 
existing constructor and pass in `null` on all existing callers which don't set 
it. This way, we can still keep the member variable final but avoid this second 
constructor which is a lot of boilerplate code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16029) Investigate cause of "Unable to find FetchSessionHandler for node X" in logs

2024-01-12 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16029:
--
Description: 
>From [~mjsax]:
{quote}Looking into AK unit/integration test logs for KS, I regularly see an 
ERROR log line that is triggered here:

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L150-L151]

Given that it seems not to impact the test (it's not failing because of this), 
I am wondering why we log this at ERROR level or if it might be better to 
reduce to WARN? It seems to happen fairly frequently, but it also seems that 
it's nothing one would need to be concerned about, and thus using ERROR might 
be more alerting to end users than it needs to be? Thoughts?
{quote}
According to Matthias, the running the {{EosIntegrationTest}} locally 
reproduces the log line.

This is also reproducible by running the Apache Kafka quickstart.

  was:
>From [~mjsax]:
{quote}Looking into AK unit/integration test logs for KS, I regularly see an 
ERROR log line that is triggered here:

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L150-L151]

Given that it seems not to impact the test (it's not failing because of this), 
I am wondering why we log this at ERROR level or if it might be better to 
reduce to WARN? It seems to happen fairly frequently, but it also seems that 
it's nothing one would need to be concerned about, and thus using ERROR might 
be more alerting to end users than it needs to be? Thoughts?
{quote}
According to Matthias, the running the {{EosIntegrationTest}} locally 
reproduces the log line.


> Investigate cause of "Unable to find FetchSessionHandler for node X" in logs
> 
>
> Key: KAFKA-16029
> URL: https://issues.apache.org/jira/browse/KAFKA-16029
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> From [~mjsax]:
> {quote}Looking into AK unit/integration test logs for KS, I regularly see an 
> ERROR log line that is triggered here:
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L150-L151]
> Given that it seems not to impact the test (it's not failing because of 
> this), I am wondering why we log this at ERROR level or if it might be better 
> to reduce to WARN? It seems to happen fairly frequently, but it also seems 
> that it's nothing one would need to be concerned about, and thus using ERROR 
> might be more alerting to end users than it needs to be? Thoughts?
> {quote}
> According to Matthias, the running the {{EosIntegrationTest}} locally 
> reproduces the log line.
> This is also reproducible by running the Apache Kafka quickstart.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14957: Update-Description-String [kafka]

2024-01-12 Thread via GitHub


mjsax commented on code in PR #13909:
URL: https://github.com/apache/kafka/pull/13909#discussion_r1381140128


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -155,6 +155,29 @@ public ConfigDef define(String name, Type type, Object 
defaultValue, Validator v
 return define(new ConfigKey(name, type, defaultValue, validator, 
importance, documentation, group, orderInGroup, width, displayName, dependents, 
recommender, false));
 }
 
+/**
+ * Define a new configuration
+ * @param name  the name of the config parameter
+ * @param type  the type of the config
+ * @param defaultValue  the default value to use if this config isn't 
present
+ * @param validator the validator to use in checking the 
correctness of the config
+ * @param importancethe importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup  the order of this config in the group
+ * @param width the width of the config
+ * @param displayName   the name suitable for display
+ * @param dependentsthe configurations that are dependents of this 
configuration
+ * @param recommender   the recommender provides valid values given 
the parent configuration values
+ * @param overwrittenValue  the overwritten value of this configuration

Review Comment:
   > the overwritten value
   
   Not very descriptive IMHO. Can we explain it better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-12 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1451062702


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   I think generally clients log such issues in INFO else customer opens the 
escalation issue where if they see logs in WARN or ERROR, that's my 
understanding. Maybe @philipnee @kirktrue can better address this.
   
   I don't think there will be other scenarios where we fall back to 
uncompressed but client metrics is something where the compression is not 
defined by the user rather sent by server and automatically picked by client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-12 Thread via GitHub


apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1451062702


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   I think generally clients log such issues in INFO else customer opens the 
escalation issue where he sees logs in WARN or ERROR, that's my understanding. 
Maybe @philipnee @kirktrue can better address this.
   
   I don't think there will be other scenarios where we fall back to 
uncompressed but client metrics is something where the compression is not 
defined by the user rather sent by server and automatically picked by client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-12 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1451046445


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   Should this be WARN-level (or maybe even ERROR) -- Do we have existing 
similar cases elsewhere in client for which we fall back to uncompressed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add 3.5.2 and 3.6.1 to system tests [kafka]

2024-01-12 Thread via GitHub


mjsax commented on PR #14932:
URL: https://github.com/apache/kafka/pull/14932#issuecomment-1890192134

   @mimaison -- should this be cherry-picked to 3.7 branch? -- And the bump to 
`3.5.2` split out for `3.6` branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]

2024-01-12 Thread via GitHub


mjsax commented on PR #15151:
URL: https://github.com/apache/kafka/pull/15151#issuecomment-1890190571

   A few tests failed: `streams_standby_replica_test` (both runs) and a 
multiple `streams_cooperative_rebalance_upgrade_test` (0.11, 1.0, 1.1, 2.2, 
2.3) plus:
   
   ```
   Module: kafkatest.tests.streams.streams_upgrade_test
   Class:  StreamsUpgradeTest
   Method: test_rolling_upgrade_with_2_bounces
   Arguments:
   {
 "from_version": "3.5.1",
 "to_version": "3.8.0-SNAPSHOT"
   }
   ```
   and
   ```
   Module: kafkatest.tests.streams.streams_upgrade_test
   Class:  StreamsUpgradeTest
   Method: test_rolling_upgrade_with_2_bounces
   Arguments:
   {
 "from_version": "3.6.0",
 "to_version": "3.8.0-SNAPSHOT"
   }
   ```
   
   Re-trigger to check for flakiness: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6030/
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16029) Investigate cause of "Unable to find FetchSessionHandler for node X" in logs

2024-01-12 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806236#comment-17806236
 ] 

Kirk True edited comment on KAFKA-16029 at 1/13/24 12:41 AM:
-

Based on input from [~ChrisEgerton], I was also able to reproduce this by 
running the Kafka [quickstart|http://example.com] against [3.7.0 RC 
2|https://home.apache.org/~stanislavkozlovski/kafka-3.7.0-rc2/kafka_2.13-3.7.0.tgz].
 When I run the quickstart using my branch with the proposed fix, I am not able 
to reproduce it.


was (Author: kirktrue):
Based on input from [~ChrisEgerton], I was also able to reproduce this by 
running the Kafka 
[quickstart|[http://example.com|https://kafka.apache.org/quickstart]] against 
[3.7.0 RC 
2|[https://home.apache.org/~stanislavkozlovski/kafka-3.7.0-rc2/kafka_2.13-3.7.0.tgz].]
 When I run the quickstart using my branch with the proposed fix, I am not able 
to reproduce it.

> Investigate cause of "Unable to find FetchSessionHandler for node X" in logs
> 
>
> Key: KAFKA-16029
> URL: https://issues.apache.org/jira/browse/KAFKA-16029
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> From [~mjsax]:
> {quote}Looking into AK unit/integration test logs for KS, I regularly see an 
> ERROR log line that is triggered here:
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L150-L151]
> Given that it seems not to impact the test (it's not failing because of 
> this), I am wondering why we log this at ERROR level or if it might be better 
> to reduce to WARN? It seems to happen fairly frequently, but it also seems 
> that it's nothing one would need to be concerned about, and thus using ERROR 
> might be more alerting to end users than it needs to be? Thoughts?
> {quote}
> According to Matthias, the running the {{EosIntegrationTest}} locally 
> reproduces the log line.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16029) Investigate cause of "Unable to find FetchSessionHandler for node X" in logs

2024-01-12 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806236#comment-17806236
 ] 

Kirk True commented on KAFKA-16029:
---

Based on input from [~ChrisEgerton], I was also able to reproduce this by 
running the Kafka 
[quickstart|[http://example.com|https://kafka.apache.org/quickstart]] against 
[3.7.0 RC 
2|[https://home.apache.org/~stanislavkozlovski/kafka-3.7.0-rc2/kafka_2.13-3.7.0.tgz].]
 When I run the quickstart using my branch with the proposed fix, I am not able 
to reproduce it.

> Investigate cause of "Unable to find FetchSessionHandler for node X" in logs
> 
>
> Key: KAFKA-16029
> URL: https://issues.apache.org/jira/browse/KAFKA-16029
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> From [~mjsax]:
> {quote}Looking into AK unit/integration test logs for KS, I regularly see an 
> ERROR log line that is triggered here:
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L150-L151]
> Given that it seems not to impact the test (it's not failing because of 
> this), I am wondering why we log this at ERROR level or if it might be better 
> to reduce to WARN? It seems to happen fairly frequently, but it also seems 
> that it's nothing one would need to be concerned about, and thus using ERROR 
> might be more alerting to end users than it needs to be? Thoughts?
> {quote}
> According to Matthias, the running the {{EosIntegrationTest}} locally 
> reproduces the log line.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16029) Investigate cause of "Unable to find FetchSessionHandler for node X" in logs

2024-01-12 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16029:
--
Issue Type: Bug  (was: Task)

> Investigate cause of "Unable to find FetchSessionHandler for node X" in logs
> 
>
> Key: KAFKA-16029
> URL: https://issues.apache.org/jira/browse/KAFKA-16029
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> From [~mjsax]:
> {quote}Looking into AK unit/integration test logs for KS, I regularly see an 
> ERROR log line that is triggered here:
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L150-L151]
> Given that it seems not to impact the test (it's not failing because of 
> this), I am wondering why we log this at ERROR level or if it might be better 
> to reduce to WARN? It seems to happen fairly frequently, but it also seems 
> that it's nothing one would need to be concerned about, and thus using ERROR 
> might be more alerting to end users than it needs to be? Thoughts?
> {quote}
> According to Matthias, the running the {{EosIntegrationTest}} locally 
> reproduces the log line.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-12 Thread via GitHub


jolshan commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1451051792


##
checkstyle/suppressions.xml:
##
@@ -315,7 +315,7 @@
 
 
+  
files="(PartitionRegistration|PartitionChangeBuilder|TopicDelta).java"/>

Review Comment:
   In between when I wrote this originally and now a lot of JBOD code was 
added. I don't think I have the skills to make it simpler. I don't think there 
is a way to change my code. I can use the annotation if you think that keeps 
the suppression more contained



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-12 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446041813


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -175,16 +182,42 @@ public static boolean 
validateRequiredResourceLabels(Map metadat
 }
 
 public static CompressionType 
preferredCompressionType(List acceptedCompressionTypes) {
-// TODO: Support compression in client telemetry.
+if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+// Broker is providing the compression types in order of 
preference. Grab the
+// first one.
+return acceptedCompressionTypes.get(0);
+}
 return CompressionType.NONE;
 }
 
 public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-// TODO: Support compression in client telemetry.
-if (compressionType == CompressionType.NONE) {
-return ByteBuffer.wrap(raw);
-} else {
-throw new UnsupportedOperationException("Compression is not 
supported");
+try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+out.write(raw);
+out.flush();
+}
+compressedOut.buffer().flip();
+return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+} catch (IOException e) {
+throw new KafkaException("Failed to compress metrics data", e);

Review Comment:
   Is it intentional to crash for this case? Or should we send data 
uncompressed if anything goes wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Kafka-14404] fix overlap of streams-config sections & describe additional parameters [kafka]

2024-01-12 Thread via GitHub


mjsax commented on code in PR #15162:
URL: https://github.com/apache/kafka/pull/15162#discussion_r1451029920


##
docs/streams/developer-guide/config-streams.html:
##
@@ -409,12 +409,12 @@ num.standby.replicasThe replication factor for changelog topics and 
repartition topics created by the application.
   The default of -1 (meaning: use broker default 
replication factor) requires broker version 2.4 or newer.
--1
+-1
   
   retry.backoff.ms
-  Medium
-  The amount of time in milliseconds, before a 
request is retried. This applies if the retries parameter is configured to be greater than 0. 

-  100 milliseconds
+Medium
+The amount of time in milliseconds, before a 
request is retried. This applies if the retries parameter is configured to be greater than 0. 

+100 milliseconds

Review Comment:
   ```suggestion
   100
   ```



##
docs/streams/developer-guide/config-streams.html:
##
@@ -,28 +,31 @@ Default 
ValuesConsumer
   earliest
 
-linger.ms
+linger.ms
   Producer
   100
 
 max.poll.records
   Consumer
   1000
 
+
+  transaction.timeout.ms
+  Producer
+  1
+
+
+  client.id
+  -
+  application.id-random-UUID

Review Comment:
   That not correct. And it's more complicated overall...
   
   By default, ie, if `client.id` is not provided by the user, we set an 
internal `clientId = applicationId + "-" + processId` (with processId being a 
UUID), ie, what you describe.
   
   However, for different clients (consumer/producer/admin) we generate more 
complex ones:
   - main consumer (`client.id` per thread):
 `clientId + "-StreamThread-" + threadIdx + "-consumer"`
   - restore consumer (`client.id` per thread):
 `clientId + "-StreamThread-" + threadIdx + "-restore-consumer"`
   - global consumer:
 `clientId + "-global-consumer"`
   - producer (alos + eos-beta; `client.id` per thread):
 `clientId + "-StreamThread-" + threadIdx + "-producer"`
   - producer (eos-alpha; `client.id` per task):
 `clientId + "-StreamThread-" + threadIdx + "-" + taskId + "-producer"`
   - admin:
 `clientId + "-admin"`
   



##
docs/streams/developer-guide/config-streams.html:
##
@@ -409,12 +409,12 @@ num.standby.replicasThe replication factor for changelog topics and 
repartition topics created by the application.
   The default of -1 (meaning: use broker default 
replication factor) requires broker version 2.4 or newer.
--1
+-1
   
   retry.backoff.ms
-  Medium
-  The amount of time in milliseconds, before a 
request is retried. This applies if the retries parameter is configured to be greater than 0. 

-  100 milliseconds
+Medium
+The amount of time in milliseconds, before a 
request is retried. This applies if the retries parameter is configured to be greater than 0. 

+100 milliseconds

Review Comment:
   The description already say:
   > The amount of time in milliseconds



##
docs/streams/developer-guide/config-streams.html:
##
@@ -,28 +,31 @@ Default 
ValuesConsumer

Review Comment:
   Just saw the following above:
   ```
   For detailed descriptions
   of these configs, see http://kafka.apache.org/0100/documentation.html#producerconfigs;>Producer 
Configs
   and http://kafka.apache.org/0100/documentation.html#newconsumerconfigs;>Consumer
 Configs.
   ```
   Still links to 0.10.0... Can we fix it?



##
docs/streams/developer-guide/config-streams.html:
##
@@ -,28 +,31 @@ Default 
ValuesConsumer
   earliest
 
-linger.ms
+linger.ms
   Producer
   100
 
 max.poll.records
   Consumer
   1000
 
+
+  transaction.timeout.ms

Review Comment:
   We set this only if EOS is enabled. Might be worth to call out?
   
   We also set `delivery.timeout.ms` to "max integer" and enable idempotency if 
EOS is enabled.



##
docs/streams/developer-guide/config-streams.html:
##
@@ -,28 +,31 @@ Default 
ValuesConsumer
   earliest
 
-linger.ms
+linger.ms
   Producer
   100
 
 max.poll.records
   Consumer
   1000
 
+
+  transaction.timeout.ms
+  Producer
+  1
+
+
+  client.id
+  -
+  application.id-random-UUID
+
 
   
 
 
   Parameters controlled by Kafka 
Streams
-  Kafka Streams assigns the following configuration parameters. If you 
try to change
-

Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1451042152


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -155,72 +153,78 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
* @param upperIndex  The upper limit of the index of the 
partitions to be included in the result.
*Note that, the upper index can be 
larger than the largest partition index in
*this topic.
-   * @returnA collection of topic partition 
metadata and whether there are more partitions.
+   * @returnA collection of topic partition 
metadata and next partition index (-1 means
+   *no next partition).
*/
   private def getPartitionMetadataForDescribeTopicResponse(
 image: MetadataImage,
 topicName: String,
 listenerName: ListenerName,
 startIndex: Int,
-upperIndex: Int
-  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Boolean) = {
+maxCount: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
 Option(image.topics().getTopic(topicName)) match {
-  case None => (None, false)
+  case None => (None, -1)
   case Some(topic) => {
 val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
-val endIndex = upperIndex.min(topic.partitions().size())
-for (partitionId <- startIndex until endIndex) {
-  val partition = topic.partitions().get(partitionId)
-  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
-listenerName, false)
-  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
-false)
-  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
-  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
-  maybeLeader match {
-case None =>
-  val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {
-debug(s"Error while fetching metadata for 
$topicName-$partitionId: leader not available")
-Errors.LEADER_NOT_AVAILABLE
-  } else {
-debug(s"Error while fetching metadata for 
$topicName-$partitionId: listener $listenerName " +
-  s"not found on leader ${partition.leader}")
-Errors.LISTENER_NOT_FOUND
-  }
-  result.addOne(new DescribeTopicPartitionsResponsePartition()
-.setErrorCode(error.code)
-.setPartitionIndex(partitionId)
-.setLeaderId(MetadataResponse.NO_LEADER_ID)
-.setLeaderEpoch(partition.leaderEpoch)
-.setReplicaNodes(filteredReplicas)
-.setIsrNodes(filteredIsr)
-.setOfflineReplicas(offlineReplicas))
-case Some(leader) =>
-  val error = if (filteredReplicas.size < 
partition.replicas.length) {
-debug(s"Error while fetching metadata for 
$topicName-$partitionId: replica information not available for " +
-  s"following brokers 
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
-Errors.REPLICA_NOT_AVAILABLE
-  } else if (filteredIsr.size < partition.isr.length) {
-debug(s"Error while fetching metadata for 
$topicName-$partitionId: in sync replica information not available for " +
-  s"following brokers 
${partition.isr.filterNot(filteredIsr.contains).mkString(",")}")
-Errors.REPLICA_NOT_AVAILABLE
-  } else {
-Errors.NONE
-  }
-
-  result.addOne(new DescribeTopicPartitionsResponsePartition()
-.setErrorCode(error.code)
-.setPartitionIndex(partitionId)
-.setLeaderId(leader.id())
-.setLeaderEpoch(partition.leaderEpoch)
-.setReplicaNodes(filteredReplicas)
-.setIsrNodes(filteredIsr)
-.setOfflineReplicas(offlineReplicas)
-.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
-.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+// The partition id may not be consecutive.
+val partitions = 
topic.partitions().keySet().stream().sorted().iterator()

Review Comment:
   I am not sure I get it. The partition IDs can be random like the cases in 
UT, I don't have an O(n) with no extra space simple solution off the top of my 
head. Maybe running the quick select can do the trick but it is not generically 
supported by Java.
   Instead, I use a tree set to maintain the top K smallest partitions larger 
than the start 

Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-12 Thread via GitHub


jsancio commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1451037707


##
checkstyle/suppressions.xml:
##
@@ -315,7 +315,7 @@
 
 
+  
files="(PartitionRegistration|PartitionChangeBuilder|TopicDelta).java"/>

Review Comment:
   Did you try changing the code so that it doesn't warn you of path 
complexity? If it is not possible can you use annotations instead? I added 
annotation support a while back. Checkout: 
https://lists.apache.org/thread/mjxfofn823o1ozrzo36ogj17xmlrxod9



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-12 Thread via GitHub


kirktrue opened a new pull request, #15186:
URL: https://github.com/apache/kafka/pull/15186

   This change restores the previous behavior for the `Fetcher` to keep its 
internal `sessionHandlers` cache even after `close()` is invoked.
   
   During the `close()` of a `Consumer`, the `Fetcher` attempts to send out 
`FetchRequest`s to signal to the relevant brokers that the consumer is closing. 
This allows the broker to clean up relevant resources on its end.
   
   The `Fetcher` was recently changed (3.5) to clear its `sessionHandlers` 
cache immediately after it finished creating the final `FetchRequest`s. The 
reasoning was that since the `Fetcher` is being closed, those session handlers 
were no longer needed, and thus the cache could safely be cleared.
   
   However, it is evidently possible that the `Fetcher`'s response handler can 
still be invoked if a response for an in-flight `FetchRequest` is received 
shortly after that cache was cleared. (There doesn't appear to be logic in the 
`Fetcher` that blocks response handling after it is closed.) In this case, once 
the `Fetcher`'s response handler determines it cannot find the 
`FetchSessionHandler` in the `sessionHandlers` cache, it logs the error and 
aborts processing.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1451037879


##
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##
@@ -103,14 +102,20 @@ public DescribeTopicPartitionsResponseData 
handleDescribeTopicPartitionsRequest(
 );
 }
 return isAuthorized;
-}).sorted().iterator();
+}).sorted();
+
+// Reset the first partition index if the cursor topic is missing from 
the authorized topic list.
+int firstPartitionId = !cursorTopicName.isEmpty() && 
authHelper.authorize(
+abstractRequest.context(), DESCRIBE, TOPIC, cursorTopicName, true, 
true, 1)
+? cursor.partitionIndex() : 0;

Review Comment:
   Good point. Previously I did not find an easy way to get the first element 
of a stream/iterator, so I verify the cursor topic again. Now I passed a 
functor to return the start index for each topic which only return non zero 
value for cursor topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1451036959


##
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+MetadataCache metadataCache;
+AuthHelper authHelper;
+KafkaConfig config;
+
+public DescribeTopicPartitionsRequestHandler(
+MetadataCache metadataCache,
+AuthHelper authHelper,
+KafkaConfig config
+) {
+this.metadataCache = metadataCache;
+this.authHelper = authHelper;
+this.config = config;
+}
+
+public DescribeTopicPartitionsResponseData 
handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+if (metadataCache instanceof ZkMetadataCache) {
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request");
+}
+KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache) 
metadataCache;
+
+DescribeTopicPartitionsRequestData request = 
((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+Set topics = new HashSet<>();
+boolean fetchAllTopics = request.topics().isEmpty();
+DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+String cursorTopicName = cursor != null ? cursor.topicName() : "";
+if (fetchAllTopics) {
+
JavaConverters.asJava(kRaftMetadataCache.getAllTopics()).forEach(topicName -> {
+if (topicName.compareTo(cursorTopicName) >= 0) {
+topics.add(topicName);
+}
+});
+} else {
+request.topics().forEach(topic -> {
+String topicName = topic.name();
+if (topicName.compareTo(cursorTopicName) >= 0) {
+topics.add(topicName);
+}
+});
+
+if (cursor != null && !topics.contains(cursor.topicName())) {
+// The topic in cursor must be included in the topic list if 
provided.
+throw new 
InvalidRequestException("DescribeTopicPartitionsRequest topic list should 
contain the cursor topic: " + cursor.topicName());
+}
+}
+
+// Do not disclose the existence of topics unauthorized for Describe, 
so we've not even checked if they exist or not
+Set 
unauthorizedForDescribeTopicMetadata = new HashSet<>();
+
+Stream authorizedTopicsStream = 
topics.stream().filter(topicName -> {
+boolean isAuthorized = authHelper.authorize(
+abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, 
true, 1);
+if (!fetchAllTopics && !isAuthorized) {
+// We should not return topicId when on unauthorized error, so 
we return zero uuid.
+  

Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1451036959


##
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+MetadataCache metadataCache;
+AuthHelper authHelper;
+KafkaConfig config;
+
+public DescribeTopicPartitionsRequestHandler(
+MetadataCache metadataCache,
+AuthHelper authHelper,
+KafkaConfig config
+) {
+this.metadataCache = metadataCache;
+this.authHelper = authHelper;
+this.config = config;
+}
+
+public DescribeTopicPartitionsResponseData 
handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+if (metadataCache instanceof ZkMetadataCache) {
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request");
+}
+KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache) 
metadataCache;
+
+DescribeTopicPartitionsRequestData request = 
((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+Set topics = new HashSet<>();
+boolean fetchAllTopics = request.topics().isEmpty();
+DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+String cursorTopicName = cursor != null ? cursor.topicName() : "";
+if (fetchAllTopics) {
+
JavaConverters.asJava(kRaftMetadataCache.getAllTopics()).forEach(topicName -> {
+if (topicName.compareTo(cursorTopicName) >= 0) {
+topics.add(topicName);
+}
+});
+} else {
+request.topics().forEach(topic -> {
+String topicName = topic.name();
+if (topicName.compareTo(cursorTopicName) >= 0) {
+topics.add(topicName);
+}
+});
+
+if (cursor != null && !topics.contains(cursor.topicName())) {
+// The topic in cursor must be included in the topic list if 
provided.
+throw new 
InvalidRequestException("DescribeTopicPartitionsRequest topic list should 
contain the cursor topic: " + cursor.topicName());
+}
+}
+
+// Do not disclose the existence of topics unauthorized for Describe, 
so we've not even checked if they exist or not
+Set 
unauthorizedForDescribeTopicMetadata = new HashSet<>();
+
+Stream authorizedTopicsStream = 
topics.stream().filter(topicName -> {
+boolean isAuthorized = authHelper.authorize(
+abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, 
true, 1);
+if (!fetchAllTopics && !isAuthorized) {
+// We should not return topicId when on unauthorized error, so 
we return zero uuid.
+  

Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1451036632


##
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+MetadataCache metadataCache;
+AuthHelper authHelper;
+KafkaConfig config;
+
+public DescribeTopicPartitionsRequestHandler(
+MetadataCache metadataCache,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16122) TransactionsBounceTest -- server disconnected before response was received

2024-01-12 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-16122:
--

 Summary: TransactionsBounceTest -- server disconnected before 
response was received
 Key: KAFKA-16122
 URL: https://issues.apache.org/jira/browse/KAFKA-16122
 Project: Kafka
  Issue Type: Test
Reporter: Justine Olshan


I noticed a ton of tests failing with 


h4.  
{code:java}
Error  org.apache.kafka.common.KafkaException: Unexpected error in 
TxnOffsetCommitResponse: The server disconnected before a response was 
received.  {code}
{code:java}
Stacktrace  org.apache.kafka.common.KafkaException: Unexpected error in 
TxnOffsetCommitResponse: The server disconnected before a response was 
received.  at 
app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1702)
  at 
app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1236)
  at 
app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
  at 
app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
  at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)  
at 
app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:457)
  at 
app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:334)
  at 
app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:249)  
at java.base@21.0.1/java.lang.Thread.run(Thread.java:1583) {code}


The error indicates a network error which is retriable but the TxnOffsetCommit 
handler doesn't expect this. 

https://issues.apache.org/jira/browse/KAFKA-14417 addressed many of the other 
requests but not this one. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16051) Deadlock on connector initialization

2024-01-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-16051.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Deadlock on connector initialization
> 
>
> Key: KAFKA-16051
> URL: https://issues.apache.org/jira/browse/KAFKA-16051
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.6.3, 3.6.1
>Reporter: Octavian Ciubotaru
>Assignee: Octavian Ciubotaru
>Priority: Major
> Fix For: 3.8.0
>
>
>  
> Tested with Kafka 3.6.1 and 2.6.3.
> The only plugin installed is confluentinc-kafka-connect-jdbc-10.7.4.
> Stack trace for Kafka 3.6.1:
> {noformat}
> Found one Java-level deadlock:
> =
> "pool-3-thread-1":
>   waiting to lock monitor 0x7fbc88006300 (object 0x91002aa0, a 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder),
>   which is held by "Thread-9"
> "Thread-9":
>   waiting to lock monitor 0x7fbc88008800 (object 0x9101ccd8, a 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore),
>   which is held by "pool-3-thread-1"Java stack information for the threads 
> listed above:
> ===
> "pool-3-thread-1":
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder$ConfigUpdateListener.onTaskConfigUpdate(StandaloneHerder.java:516)
>     - waiting to lock <0x91002aa0> (a 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder)
>     at 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore.putTaskConfigs(MemoryConfigBackingStore.java:137)
>     - locked <0x9101ccd8> (a 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:483)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$null$2(StandaloneHerder.java:229)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder$$Lambda$692/0x000840557440.run(Unknown
>  Source)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.21/Executors.java:515)
>     at 
> java.util.concurrent.FutureTask.run(java.base@11.0.21/FutureTask.java:264)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@11.0.21/ScheduledThreadPoolExecutor.java:304)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.21/ThreadPoolExecutor.java:1128)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.21/ThreadPoolExecutor.java:628)
>     at java.lang.Thread.run(java.base@11.0.21/Thread.java:829)
> "Thread-9":
>     at 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore.putTaskConfigs(MemoryConfigBackingStore.java:129)
>     - waiting to lock <0x9101ccd8> (a 
> org.apache.kafka.connect.storage.MemoryConfigBackingStore)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:483)
>     at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.requestTaskReconfiguration(StandaloneHerder.java:255)
>     - locked <0x91002aa0> (a 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder)
>     at 
> org.apache.kafka.connect.runtime.HerderConnectorContext.requestTaskReconfiguration(HerderConnectorContext.java:50)
>     at 
> org.apache.kafka.connect.runtime.WorkerConnector$WorkerConnectorContext.requestTaskReconfiguration(WorkerConnector.java:548)
>     at 
> io.confluent.connect.jdbc.source.TableMonitorThread.run(TableMonitorThread.java:86)
> Found 1 deadlock.
> {noformat}
> The jdbc source connector is loading tables from the database and updates the 
> configuration once the list is available. The deadlock is very consistent in 
> my environment, probably because the database is on the same machine.
> Maybe it is possible to avoid this situation by always locking the herder 
> first and the config backing store second. From what I see, 
> updateConnectorTasks sometimes is called before locking on herder and other 
> times it is not.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-12 Thread via GitHub


gharris1727 merged PR #15080:
URL: https://github.com/apache/kafka/pull/15080


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on PR #15080:
URL: https://github.com/apache/kafka/pull/15080#issuecomment-1890150013

   Test failures appear unrelated, and the tests pass locally for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16118; Coordinator unloading fails when replica is deleted [kafka]

2024-01-12 Thread via GitHub


jolshan merged PR #15182:
URL: https://github.com/apache/kafka/pull/15182


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15834: Remove NamedTopologyIntegrationTest which leaks clients [kafka]

2024-01-12 Thread via GitHub


mjsax commented on PR #15185:
URL: https://github.com/apache/kafka/pull/15185#issuecomment-1890137576

   Not sure. Leave it up to @ableegoldman to sign off and make a call.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2024-01-12 Thread via GitHub


junrao commented on code in PR #14766:
URL: https://github.com/apache/kafka/pull/14766#discussion_r1450990461


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,25 +1425,70 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
-def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
-
-  // Check not to delete segments which are not yet copied to tiered 
storage if remote log is enabled.
-  (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage)) &&
-// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
-// offset can never exceed it.
-highWatermark >= upperBoundOffset &&
-predicate(segment, nextSegmentOpt)
-}
 lock synchronized {
-  val deletable = localLog.deletableSegments(shouldDelete)
+  val deletable = deletableSegments(predicate)
   if (deletable.nonEmpty)
 deleteSegments(deletable, reason)
   else
 0
 }
   }
 
+  /**
+   * Find segments starting from the oldest until the user-supplied predicate 
is false.
+   * A final segment that is empty will never be returned.
+   *
+   * @param predicate A function that takes in a candidate log segment, the 
next higher segment
+   *  (if there is one). It returns true iff the segment is 
deletable.
+   * @return the segments ready to be deleted
+   */
+  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
+def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = {
+  // Segments are eligible for deletion when:
+  //1. they are uploaded to the remote storage
+  if (remoteLogEnabled()) {
+upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage
+  } else {
+true
+  }
+}
+
+if (localLog.segments.isEmpty) {
+  Seq.empty
+} else {
+  val deletable = ArrayBuffer.empty[LogSegment]
+  val segmentsIterator = localLog.segments.values.iterator
+  var segmentOpt = nextOption(segmentsIterator)
+  var shouldRoll = false
+  while (segmentOpt.isDefined) {
+val segment = segmentOpt.get
+val nextSegmentOpt = nextOption(segmentsIterator)
+val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
+val upperBoundOffset = if (nextSegmentOpt.nonEmpty) 
nextSegmentOpt.get.baseOffset() else logEndOffset
+// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
+// offset can never exceed it.
+val predicateResult = highWatermark >= upperBoundOffset && 
predicate(segment, nextSegmentOpt)
+
+// Roll the active segment when it breaches the configured retention 
policy. The rolled segment will be
+// eligible for deletion and gets removed in the next iteration.
+if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && 
segment.size > 0) {
+  shouldRoll = true

Review Comment:
   > We have checks to allow only the passive segments to be uploaded, so the 
active segment never gets removed at all even if breaches the retention time.
   
   Hmm, the above description of the PR doesn't seem correct. Before this PR, 
we will consider the deletion of the active segment as long as it's not empty. 
If the active segment is meets the deletion criteria, we will delete it and 
automatically roll a new empty segment. So, why do we need to explicitly roll 
here? Also, why do we only do that when remote log is enabled? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


artemlivshits commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1450990079


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -155,72 +153,78 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
* @param upperIndex  The upper limit of the index of the 
partitions to be included in the result.
*Note that, the upper index can be 
larger than the largest partition index in
*this topic.
-   * @returnA collection of topic partition 
metadata and whether there are more partitions.
+   * @returnA collection of topic partition 
metadata and next partition index (-1 means
+   *no next partition).
*/
   private def getPartitionMetadataForDescribeTopicResponse(
 image: MetadataImage,
 topicName: String,
 listenerName: ListenerName,
 startIndex: Int,
-upperIndex: Int
-  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Boolean) = {
+maxCount: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
 Option(image.topics().getTopic(topicName)) match {
-  case None => (None, false)
+  case None => (None, -1)
   case Some(topic) => {
 val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
-val endIndex = upperIndex.min(topic.partitions().size())
-for (partitionId <- startIndex until endIndex) {
-  val partition = topic.partitions().get(partitionId)
-  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
-listenerName, false)
-  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
-false)
-  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
-  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
-  maybeLeader match {
-case None =>
-  val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {
-debug(s"Error while fetching metadata for 
$topicName-$partitionId: leader not available")
-Errors.LEADER_NOT_AVAILABLE
-  } else {
-debug(s"Error while fetching metadata for 
$topicName-$partitionId: listener $listenerName " +
-  s"not found on leader ${partition.leader}")
-Errors.LISTENER_NOT_FOUND
-  }
-  result.addOne(new DescribeTopicPartitionsResponsePartition()
-.setErrorCode(error.code)
-.setPartitionIndex(partitionId)
-.setLeaderId(MetadataResponse.NO_LEADER_ID)
-.setLeaderEpoch(partition.leaderEpoch)
-.setReplicaNodes(filteredReplicas)
-.setIsrNodes(filteredIsr)
-.setOfflineReplicas(offlineReplicas))
-case Some(leader) =>
-  val error = if (filteredReplicas.size < 
partition.replicas.length) {
-debug(s"Error while fetching metadata for 
$topicName-$partitionId: replica information not available for " +
-  s"following brokers 
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
-Errors.REPLICA_NOT_AVAILABLE
-  } else if (filteredIsr.size < partition.isr.length) {
-debug(s"Error while fetching metadata for 
$topicName-$partitionId: in sync replica information not available for " +
-  s"following brokers 
${partition.isr.filterNot(filteredIsr.contains).mkString(",")}")
-Errors.REPLICA_NOT_AVAILABLE
-  } else {
-Errors.NONE
-  }
-
-  result.addOne(new DescribeTopicPartitionsResponsePartition()
-.setErrorCode(error.code)
-.setPartitionIndex(partitionId)
-.setLeaderId(leader.id())
-.setLeaderEpoch(partition.leaderEpoch)
-.setReplicaNodes(filteredReplicas)
-.setIsrNodes(filteredIsr)
-.setOfflineReplicas(offlineReplicas)
-.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
-.setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
+// The partition id may not be consecutive.
+val partitions = 
topic.partitions().keySet().stream().sorted().iterator()

Review Comment:
   This has O(N*logN) runtime complexity and O(N) space complexity.  We could 
do O(N) complexity and not have an extra copy if we just iterate over all 
partitions and filter the ones that fit into the required range (one of your 
previous implementations had this).



##

[jira] [Commented] (KAFKA-16089) Kafka Streams still leaking memory

2024-01-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806217#comment-17806217
 ] 

Matthias J. Sax commented on KAFKA-16089:
-

Great job!

> Kafka Streams still leaking memory
> --
>
> Key: KAFKA-16089
> URL: https://issues.apache.org/jira/browse/KAFKA-16089
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Critical
> Fix For: 3.8.0
>
> Attachments: fix.png, graphviz (1).svg, unfix.png
>
>
> In 
> [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2]
>  a leak was fixed in the release candidate for 3.7.
>  
> However, Kafka Streams still seems to be leaking memory (just slower) after 
> the fix.
>  
> Attached is the `jeprof` output right before a crash after ~11 hours.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450941019


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##
@@ -392,90 +397,29 @@ public void testSetConfigs() {
 }
 
 @Test
-public void testThreadSafety() throws Throwable {
-long runtimeMs = 5_000;
-int numThreads = 10;
-// Check that multiple threads using RetryWithToleranceOperator 
concurrently
-// can't corrupt the state of the ProcessingContext
-AtomicReference failed = new AtomicReference<>(null);
-RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
-ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, 
errorHandlingMetrics, new ProcessingContext() {
-private final AtomicInteger count = new AtomicInteger();
-private final AtomicInteger attempt = new AtomicInteger();
-
-@Override
-public void error(Throwable error) {
-if (count.getAndIncrement() > 0) {
-failed.compareAndSet(null, new 
AssertionError("Concurrent call to error()"));
-}
-super.error(error);
-}
-
-@Override
-public Future report() {
-if (count.getAndSet(0) > 1) {
-failed.compareAndSet(null, new 
AssertionError("Concurrent call to error() in report()"));
-}
-
-return super.report();
-}
-
-@Override
-public void currentContext(Stage stage, Class klass) {
-this.attempt.set(0);
-super.currentContext(stage, klass);
-}
-
-@Override
-public void attempt(int attempt) {
-if (!this.attempt.compareAndSet(attempt - 1, attempt)) 
{
-failed.compareAndSet(null, new AssertionError(
-"Concurrent call to attempt(): Attempts 
should increase monotonically " +
-"within the scope of a given 
currentContext()"));
-}
-super.attempt(attempt);
-}
-}, new CountDownLatch(1));
-
-ExecutorService pool = Executors.newFixedThreadPool(numThreads);
-List> futures = IntStream.range(0, 
numThreads).boxed()
-.map(id ->
-pool.submit(() -> {
-long t0 = System.currentTimeMillis();
-long i = 0;
-while (true) {
-if (++i % 1 == 0 && 
System.currentTimeMillis() > t0 + runtimeMs) {
-break;
-}
-if (failed.get() != null) {
-break;
-}
-try {
-if (id < numThreads / 2) {
-
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-SinkTask.class, 
consumerRecord, new Throwable()).get();
-} else {
-retryWithToleranceOperator.execute(() 
-> null, Stage.TRANSFORMATION,
-SinkTask.class);
-}
-} catch (Exception e) {
-failed.compareAndSet(null, e);
-}
-}
-}))
-.collect(Collectors.toList());
-pool.shutdown();
-pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
-futures.forEach(future -> {
-try {
-future.get();
-} catch (Exception e) {
-failed.compareAndSet(null, e);
-}

Review Comment:
   I think getting a deterministic reproduction case would involve programmatic 
breakpoints like what I used in the SynchronizationTest, and would involve 
modifying the WorkerSourceTask to be able to intercept between the execute() 
and failed() calls.
   
   I think if I were to copy-paste the call-site from 
WorkerSinkTask/WorkerErrantRecordReporter into an explicit test for this, then 
it does nothing to prove that the bug is absent in the main code. I think that 
is the core flaw in the testThreadSafety test: the class is "thread safe" but 
the call-site of it wasn't.
   
   Also 

Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1450935147


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -884,6 +890,158 @@ private void assertTimeoutException(Runnable operation, 
String expectedStageDesc
 connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
 }
 
+/**
+ * Tests the logic around enforcement of the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
+ * property and how it can be toggled via the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG 
tasks.max.enforce}
+ * property, following the test plain laid out in
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan;>KIP-1004.
+ */
+@Test
+public void testTasksMaxEnforcement() throws Exception {
+String configTopic = "tasks-max-enforcement-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(
+NUM_WORKERS,
+"Initial group of workers did not start in time."
+);
+
+Map connectorProps = 
defaultSourceConnectorProps(TOPIC_NAME);
+int maxTasks = 1;
+connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+int numTasks = 2;
+connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+// A connector that generates excessive tasks will be failed with an 
expected error message
+connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+CONNECTOR_NAME,
+0,
+"connector did not fail in time"
+);
+
+String expectedErrorSnippet = String.format(
+"The connector %s has generated %d tasks, which is greater 
than %d, "
++ "the maximum number of tasks it is configured to 
create. ",
+CONNECTOR_NAME,
+numTasks,
+maxTasks
+);
+String errorMessage = 
connect.connectorStatus(CONNECTOR_NAME).connector().trace();
+assertThat(errorMessage, containsString(expectedErrorSnippet));
+
+// Stop all workers in the cluster
+connect.workers().forEach(connect::removeWorker);
+
+// Publish a set of too many task configs to the config topic, to 
simulate

Review Comment:
   Okay thank you for humoring me. I don't think it's a blocker for merge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15816) Typos in tests leak network sockets

2024-01-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-15816.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Typos in tests leak network sockets
> ---
>
> Key: KAFKA-15816
> URL: https://issues.apache.org/jira/browse/KAFKA-15816
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.8.0
>
>
> There are a few tests which leak network sockets due to small typos in the 
> tests themselves.
> Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
>  * NioEchoServer
>  * KafkaConsumerTest
>  * KafkaProducerTest
>  * SelectorTest
>  * SslTransportLayerTest
>  * SslTransportTls12Tls13Test
>  * SslVersionsTransportLayerTest
>  * SaslAuthenticatorTest
> Core: [https://github.com/apache/kafka/pull/14754] (DONE)
>  * MiniKdc
>  * GssapiAuthenticationTest
>  * MirrorMakerIntegrationTest
>  * SocketServerTest
>  * EpochDrivenReplicationProtocolAcceptanceTest
>  * LeaderEpochIntegrationTest
> Trogdor: [https://github.com/apache/kafka/pull/14771] (DONE)
>  * AgentTest
> Mirror: [https://github.com/apache/kafka/pull/14761] (DONE)
>  * DedicatedMirrorIntegrationTest
>  * MirrorConnectorsIntegrationTest
>  * MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> Runtime: [https://github.com/apache/kafka/pull/14764] (DONE)
>  * ConnectorTopicsIntegrationTest
>  * ExactlyOnceSourceIntegrationTest
>  * WorkerTest
>  * WorkerGroupMemberTest
> Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
>  * IQv2IntegrationTest
>  * MetricsReporterIntegrationTest
>  * NamedTopologyIntegrationTest
>  * PurgeRepartitionTopicIntegrationTest
> These can be addressed by just fixing the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets

2024-01-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15816:

Description: 
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
 * NioEchoServer
 * KafkaConsumerTest
 * KafkaProducerTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest
 * SaslAuthenticatorTest

Core: [https://github.com/apache/kafka/pull/14754] (DONE)
 * MiniKdc
 * GssapiAuthenticationTest
 * MirrorMakerIntegrationTest
 * SocketServerTest
 * EpochDrivenReplicationProtocolAcceptanceTest
 * LeaderEpochIntegrationTest

Trogdor: [https://github.com/apache/kafka/pull/14771] (DONE)
 * AgentTest

Mirror: [https://github.com/apache/kafka/pull/14761] (DONE)
 * DedicatedMirrorIntegrationTest
 * MirrorConnectorsIntegrationTest
 * MirrorConnectorsWithCustomForwardingAdminIntegrationTest

Runtime: [https://github.com/apache/kafka/pull/14764] (DONE)
 * ConnectorTopicsIntegrationTest
 * ExactlyOnceSourceIntegrationTest
 * WorkerTest
 * WorkerGroupMemberTest

Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
 * IQv2IntegrationTest
 * MetricsReporterIntegrationTest
 * NamedTopologyIntegrationTest
 * PurgeRepartitionTopicIntegrationTest

These can be addressed by just fixing the tests.

  was:
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
 * NioEchoServer
 * KafkaConsumerTest
 * KafkaProducerTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest
 * SaslAuthenticatorTest

Core: [https://github.com/apache/kafka/pull/14754] 
 * MiniKdc
 * GssapiAuthenticationTest
 * MirrorMakerIntegrationTest
 * SocketServerTest
 * EpochDrivenReplicationProtocolAcceptanceTest
 * LeaderEpochIntegrationTest

Trogdor: [https://github.com/apache/kafka/pull/14771] 
 * AgentTest

Mirror: [https://github.com/apache/kafka/pull/14761] (DONE)
 * DedicatedMirrorIntegrationTest
 * MirrorConnectorsIntegrationTest
 * MirrorConnectorsWithCustomForwardingAdminIntegrationTest

Runtime: [https://github.com/apache/kafka/pull/14764] (DONE)
 * ConnectorTopicsIntegrationTest
 * ExactlyOnceSourceIntegrationTest
 * WorkerTest
 * WorkerGroupMemberTest

Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
 * IQv2IntegrationTest
 * MetricsReporterIntegrationTest
 * NamedTopologyIntegrationTest
 * PurgeRepartitionTopicIntegrationTest

These can be addressed by just fixing the tests.


> Typos in tests leak network sockets
> ---
>
> Key: KAFKA-15816
> URL: https://issues.apache.org/jira/browse/KAFKA-15816
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> There are a few tests which leak network sockets due to small typos in the 
> tests themselves.
> Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
>  * NioEchoServer
>  * KafkaConsumerTest
>  * KafkaProducerTest
>  * SelectorTest
>  * SslTransportLayerTest
>  * SslTransportTls12Tls13Test
>  * SslVersionsTransportLayerTest
>  * SaslAuthenticatorTest
> Core: [https://github.com/apache/kafka/pull/14754] (DONE)
>  * MiniKdc
>  * GssapiAuthenticationTest
>  * MirrorMakerIntegrationTest
>  * SocketServerTest
>  * EpochDrivenReplicationProtocolAcceptanceTest
>  * LeaderEpochIntegrationTest
> Trogdor: [https://github.com/apache/kafka/pull/14771] (DONE)
>  * AgentTest
> Mirror: [https://github.com/apache/kafka/pull/14761] (DONE)
>  * DedicatedMirrorIntegrationTest
>  * MirrorConnectorsIntegrationTest
>  * MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> Runtime: [https://github.com/apache/kafka/pull/14764] (DONE)
>  * ConnectorTopicsIntegrationTest
>  * ExactlyOnceSourceIntegrationTest
>  * WorkerTest
>  * WorkerGroupMemberTest
> Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
>  * IQv2IntegrationTest
>  * MetricsReporterIntegrationTest
>  * NamedTopologyIntegrationTest
>  * PurgeRepartitionTopicIntegrationTest
> These can be addressed by just fixing the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450933080


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##
@@ -392,90 +397,29 @@ public void testSetConfigs() {
 }
 
 @Test
-public void testThreadSafety() throws Throwable {
-long runtimeMs = 5_000;
-int numThreads = 10;
-// Check that multiple threads using RetryWithToleranceOperator 
concurrently
-// can't corrupt the state of the ProcessingContext
-AtomicReference failed = new AtomicReference<>(null);
-RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
-ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, 
errorHandlingMetrics, new ProcessingContext() {
-private final AtomicInteger count = new AtomicInteger();
-private final AtomicInteger attempt = new AtomicInteger();
-
-@Override
-public void error(Throwable error) {
-if (count.getAndIncrement() > 0) {
-failed.compareAndSet(null, new 
AssertionError("Concurrent call to error()"));
-}
-super.error(error);
-}
-
-@Override
-public Future report() {
-if (count.getAndSet(0) > 1) {
-failed.compareAndSet(null, new 
AssertionError("Concurrent call to error() in report()"));
-}
-
-return super.report();
-}
-
-@Override
-public void currentContext(Stage stage, Class klass) {
-this.attempt.set(0);
-super.currentContext(stage, klass);
-}
-
-@Override
-public void attempt(int attempt) {
-if (!this.attempt.compareAndSet(attempt - 1, attempt)) 
{
-failed.compareAndSet(null, new AssertionError(
-"Concurrent call to attempt(): Attempts 
should increase monotonically " +
-"within the scope of a given 
currentContext()"));
-}
-super.attempt(attempt);
-}
-}, new CountDownLatch(1));
-
-ExecutorService pool = Executors.newFixedThreadPool(numThreads);
-List> futures = IntStream.range(0, 
numThreads).boxed()
-.map(id ->
-pool.submit(() -> {
-long t0 = System.currentTimeMillis();
-long i = 0;
-while (true) {
-if (++i % 1 == 0 && 
System.currentTimeMillis() > t0 + runtimeMs) {
-break;
-}
-if (failed.get() != null) {
-break;
-}
-try {
-if (id < numThreads / 2) {
-
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-SinkTask.class, 
consumerRecord, new Throwable()).get();
-} else {
-retryWithToleranceOperator.execute(() 
-> null, Stage.TRANSFORMATION,
-SinkTask.class);
-}
-} catch (Exception e) {
-failed.compareAndSet(null, e);
-}
-}
-}))
-.collect(Collectors.toList());
-pool.shutdown();
-pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
-futures.forEach(future -> {
-try {
-future.get();
-} catch (Exception e) {
-failed.compareAndSet(null, e);
-}

Review Comment:
   There's no unit testing we can add that would deterministically fail if the 
bug resurfaces? Given how notoriously flaky our integration tests are, I'm a 
little afraid to rely on something that only fails some of the time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450932511


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java:
##
@@ -54,7 +65,16 @@ public void put(Collection records) {
 .computeIfAbsent(rec.topic(), v -> new HashMap<>())
 .computeIfAbsent(rec.kafkaPartition(), v -> new 
TopicPartition(rec.topic(), rec.kafkaPartition()));
 committedOffsets.put(tp, committedOffsets.getOrDefault(tp, 0) 
+ 1);
-reporter.report(rec, new Throwable());
+Throwable error = new Throwable();
+// Test synchronous and asynchronous reporting, allowing for 
re-ordering the errant reports

Review Comment:
   Ah, that's true. Can we also verify that the reported records don't include 
any duplicates? Would add at least a little more safety here in case 
refactoring caused successive calls to `ErrantRecordReporter::report` to 
overwrite each other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15816: Fix leaked sockets in core tests [kafka]

2024-01-12 Thread via GitHub


gharris1727 merged PR #14754:
URL: https://github.com/apache/kafka/pull/14754


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15816: Fix leaked sockets in core tests [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on PR #14754:
URL: https://github.com/apache/kafka/pull/14754#issuecomment-1889959363

   It looks like the consistent failures I was seeing were present only on 
JDK21, didn't reproduce with JDK11, but were already present on trunk. I got 
some flakey failures with JDK11 locally, but they appear unrelated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1450925859


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -884,6 +890,158 @@ private void assertTimeoutException(Runnable operation, 
String expectedStageDesc
 connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
 }
 
+/**
+ * Tests the logic around enforcement of the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
+ * property and how it can be toggled via the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG 
tasks.max.enforce}
+ * property, following the test plain laid out in
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan;>KIP-1004.
+ */
+@Test
+public void testTasksMaxEnforcement() throws Exception {
+String configTopic = "tasks-max-enforcement-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(
+NUM_WORKERS,
+"Initial group of workers did not start in time."
+);
+
+Map connectorProps = 
defaultSourceConnectorProps(TOPIC_NAME);
+int maxTasks = 1;
+connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+int numTasks = 2;
+connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+// A connector that generates excessive tasks will be failed with an 
expected error message
+connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+CONNECTOR_NAME,
+0,
+"connector did not fail in time"
+);
+
+String expectedErrorSnippet = String.format(
+"The connector %s has generated %d tasks, which is greater 
than %d, "
++ "the maximum number of tasks it is configured to 
create. ",
+CONNECTOR_NAME,
+numTasks,
+maxTasks
+);
+String errorMessage = 
connect.connectorStatus(CONNECTOR_NAME).connector().trace();
+assertThat(errorMessage, containsString(expectedErrorSnippet));
+
+// Stop all workers in the cluster
+connect.workers().forEach(connect::removeWorker);
+
+// Publish a set of too many task configs to the config topic, to 
simulate

Review Comment:
   Okay, trying this again, it's still not looking so great. Even with a null 
`WorkerConfigTransformer` and relying on the mutability of the `workerProps` 
field, there's another hurdle in the way: a non-null 
`ConfigBackingStore.UpdateListener` has to be supplied to the 
`KafkaConfigBackingStore` instance, or it'll fail when trying to read to the 
end of the config topic after a call to `putTaskConfigs`.
   
   Implementing that interface, even with no-ops, would nearly double the 
verbosity of this part of the testing code. We could add some `updateListener 
!= null` checks to the `KafkaConfigBackingStore` class, but IMO it's not worth 
it to remove those guardrails just to accommodate testing code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


mumrah commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1450910805


##
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+MetadataCache metadataCache;
+AuthHelper authHelper;
+KafkaConfig config;
+
+public DescribeTopicPartitionsRequestHandler(
+MetadataCache metadataCache,

Review Comment:
   Since this is KRaft only, we can take the more specific type here (maybe we 
can call it from the `match` in KafkaApis)



##
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+MetadataCache metadataCache;
+AuthHelper authHelper;
+   

Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450911725


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java:
##
@@ -75,9 +77,9 @@ public class ErrorHandlingIntegrationTest {
 private static final String DLQ_TOPIC = "my-connector-errors";
 private static final String CONNECTOR_NAME = "error-conn";
 private static final String TASK_ID = "error-conn-0";
-private static final int NUM_RECORDS_PRODUCED = 20;
-private static final int EXPECTED_CORRECT_RECORDS = 19;
+private static final int NUM_RECORDS_PRODUCED = 1000;
 private static final int EXPECTED_INCORRECT_RECORDS = 1;
+private static final int EXPECTED_CORRECT_RECORDS = NUM_RECORDS_PRODUCED - 
EXPECTED_INCORRECT_RECORDS;

Review Comment:
   Good catch, this test had the same flaw in two places, and I already fixed 
the other one. I think one test was copied from the other in this class, and 
these situational constants were used incorretly. I changed the constants so 
that the failure count is clearly relevant to the FaultyPassthrough, and may be 
less likely to be copied in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1450909786


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -884,6 +890,158 @@ private void assertTimeoutException(Runnable operation, 
String expectedStageDesc
 connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
 }
 
+/**
+ * Tests the logic around enforcement of the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
+ * property and how it can be toggled via the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG 
tasks.max.enforce}
+ * property, following the test plain laid out in
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan;>KIP-1004.
+ */
+@Test
+public void testTasksMaxEnforcement() throws Exception {
+String configTopic = "tasks-max-enforcement-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(
+NUM_WORKERS,
+"Initial group of workers did not start in time."
+);
+
+Map connectorProps = 
defaultSourceConnectorProps(TOPIC_NAME);
+int maxTasks = 1;
+connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+int numTasks = 2;
+connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+// A connector that generates excessive tasks will be failed with an 
expected error message
+connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+CONNECTOR_NAME,
+0,
+"connector did not fail in time"
+);
+
+String expectedErrorSnippet = String.format(
+"The connector %s has generated %d tasks, which is greater 
than %d, "
++ "the maximum number of tasks it is configured to 
create. ",
+CONNECTOR_NAME,
+numTasks,
+maxTasks
+);
+String errorMessage = 
connect.connectorStatus(CONNECTOR_NAME).connector().trace();
+assertThat(errorMessage, containsString(expectedErrorSnippet));
+
+// Stop all workers in the cluster
+connect.workers().forEach(connect::removeWorker);
+
+// Publish a set of too many task configs to the config topic, to 
simulate

Review Comment:
   I'd really rather not rely on the mutability of the `workerProps` field as 
it makes its way though various stages (builder, instantiating embedded worker, 
starting embedded worker). I'm also uncomfortable passing in a null 
transformer--a no-op transformer would be cleaner, but also more verbose.
   
   I'll give it a shot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450908695


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##
@@ -392,90 +397,29 @@ public void testSetConfigs() {
 }
 
 @Test
-public void testThreadSafety() throws Throwable {
-long runtimeMs = 5_000;
-int numThreads = 10;
-// Check that multiple threads using RetryWithToleranceOperator 
concurrently
-// can't corrupt the state of the ProcessingContext
-AtomicReference failed = new AtomicReference<>(null);
-RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
-ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, 
errorHandlingMetrics, new ProcessingContext() {
-private final AtomicInteger count = new AtomicInteger();
-private final AtomicInteger attempt = new AtomicInteger();
-
-@Override
-public void error(Throwable error) {
-if (count.getAndIncrement() > 0) {
-failed.compareAndSet(null, new 
AssertionError("Concurrent call to error()"));
-}
-super.error(error);
-}
-
-@Override
-public Future report() {
-if (count.getAndSet(0) > 1) {
-failed.compareAndSet(null, new 
AssertionError("Concurrent call to error() in report()"));
-}
-
-return super.report();
-}
-
-@Override
-public void currentContext(Stage stage, Class klass) {
-this.attempt.set(0);
-super.currentContext(stage, klass);
-}
-
-@Override
-public void attempt(int attempt) {
-if (!this.attempt.compareAndSet(attempt - 1, attempt)) 
{
-failed.compareAndSet(null, new AssertionError(
-"Concurrent call to attempt(): Attempts 
should increase monotonically " +
-"within the scope of a given 
currentContext()"));
-}
-super.attempt(attempt);
-}
-}, new CountDownLatch(1));
-
-ExecutorService pool = Executors.newFixedThreadPool(numThreads);
-List> futures = IntStream.range(0, 
numThreads).boxed()
-.map(id ->
-pool.submit(() -> {
-long t0 = System.currentTimeMillis();
-long i = 0;
-while (true) {
-if (++i % 1 == 0 && 
System.currentTimeMillis() > t0 + runtimeMs) {
-break;
-}
-if (failed.get() != null) {
-break;
-}
-try {
-if (id < numThreads / 2) {
-
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-SinkTask.class, 
consumerRecord, new Throwable()).get();
-} else {
-retryWithToleranceOperator.execute(() 
-> null, Stage.TRANSFORMATION,
-SinkTask.class);
-}
-} catch (Exception e) {
-failed.compareAndSet(null, e);
-}
-}
-}))
-.collect(Collectors.toList());
-pool.shutdown();
-pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
-futures.forEach(future -> {
-try {
-future.get();
-} catch (Exception e) {
-failed.compareAndSet(null, e);
-}

Review Comment:
   The replacement is the augmented ErrorHandlingTest#testErrantRecordReporter. 
That test (sometimes) fails when the race condition is present, and passes if 
the race condition is avoided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450907710


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java:
##
@@ -54,7 +65,16 @@ public void put(Collection records) {
 .computeIfAbsent(rec.topic(), v -> new HashMap<>())
 .computeIfAbsent(rec.kafkaPartition(), v -> new 
TopicPartition(rec.topic(), rec.kafkaPartition()));
 committedOffsets.put(tp, committedOffsets.getOrDefault(tp, 0) 
+ 1);
-reporter.report(rec, new Throwable());
+Throwable error = new Throwable();
+// Test synchronous and asynchronous reporting, allowing for 
re-ordering the errant reports

Review Comment:
   What verification are you thinking about? Currently we're asserting that at 
least N errors are reported in total after delivering N records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450900313


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java:
##
@@ -44,6 +49,12 @@ public ErrantRecordSinkTask() {
 public void start(Map props) {
 super.start(props);
 reporter = context.errantRecordReporter();
+executorService = Executors.newSingleThreadExecutor();
+}
+
+@Override
+public void stop() {
+ThreadUtils.shutdownExecutorServiceQuietly(executorService, 4, 
TimeUnit.SECONDS);

Review Comment:
   I picked this because it was less than the default graceful shutdown 
timeout, but i don't think it's particularly sensitive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1450898354


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -884,6 +890,158 @@ private void assertTimeoutException(Runnable operation, 
String expectedStageDesc
 connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
 }
 
+/**
+ * Tests the logic around enforcement of the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
+ * property and how it can be toggled via the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG 
tasks.max.enforce}
+ * property, following the test plain laid out in
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan;>KIP-1004.
+ */
+@Test
+public void testTasksMaxEnforcement() throws Exception {
+String configTopic = "tasks-max-enforcement-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(
+NUM_WORKERS,
+"Initial group of workers did not start in time."
+);
+
+Map connectorProps = 
defaultSourceConnectorProps(TOPIC_NAME);
+int maxTasks = 1;
+connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+int numTasks = 2;
+connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+// A connector that generates excessive tasks will be failed with an 
expected error message
+connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+CONNECTOR_NAME,
+0,
+"connector did not fail in time"
+);
+
+String expectedErrorSnippet = String.format(
+"The connector %s has generated %d tasks, which is greater 
than %d, "
++ "the maximum number of tasks it is configured to 
create. ",
+CONNECTOR_NAME,
+numTasks,
+maxTasks
+);
+String errorMessage = 
connect.connectorStatus(CONNECTOR_NAME).connector().trace();
+assertThat(errorMessage, containsString(expectedErrorSnippet));
+
+// Stop all workers in the cluster
+connect.workers().forEach(connect::removeWorker);
+
+// Publish a set of too many task configs to the config topic, to 
simulate

Review Comment:
   > the constructor dependencies for the KafkaConfigBackingStore 
(WorkerConfigTransformer (which itself requires a Worker instance)
   
   I think you can get away with a null transformer, the ClusterConfigState has 
null checks (and shouldn't be instantiated anyway). I agree that creating a 
full WorkerConfigTransformer is going too far.
   
   > DistributedConfig especially
   
   Is there something I'm not seeing? You already have a workerProps, and it 
gets filled out with all of the embedded cluster's details during the first 
cluster start.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16032) Review client error handling of OffsetFetch and OffsetCommit responses

2024-01-12 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16032:
---
Summary: Review client error handling of OffsetFetch and OffsetCommit 
responses  (was: Review client inconsistent error handling of OffsetFetch and 
OffsetCommit responses)

> Review client error handling of OffsetFetch and OffsetCommit responses
> --
>
> Key: KAFKA-16032
> URL: https://issues.apache.org/jira/browse/KAFKA-16032
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to 
> review around this:
>  - Ensure that we keep the legacy behaviour of handling expected errors, and 
> failing with KafkaException for all unexpected errors (even if retriable). 
>  - Some errors are not handled similarly in both requests (ex. 
> COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not 
> OffsetFetch). Note that the specific errors handled by each request were kept 
> the same as in the legacy ConsumerCoordinator but this should be reviewed, in 
> an attempt to handle the same errors, in the same way, whenever possible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16032) Review client inconsistent error handling of OffsetFetch and OffsetCommit responses

2024-01-12 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16032:
---
Description: 
OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to 
review around this:
 - Ensure that we keep the legacy behaviour of handling expected errors, and 
failing with KafkaException for all unexpected errors (even if retriable). 
 - Some errors are not handled similarly in both requests (ex. 
COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not 
OffsetFetch). Note that the specific errors handled by each request were kept 
the same as in the legacy ConsumerCoordinator but this should be reviewed, in 
an attempt to handle the same errors, in the same way, whenever possible.

  was:
OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to 
review around this:
 - The logic is duplicated for some errors that are treated similarly (ex. 
NOT_COORDINATOR)

 - Some errors are not handled similarly in both requests (ex. 
COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not 
OffsetFetch). Note that the specific errors handled by each request were kept 
the same as in the legacy ConsumerCoordinator but this should be reviewed, in 
an attempt to handle the same errors, in the same way, whenever possible.

Note that the legacy approach handles expected errors for each path (FETCH and 
COMMIT), retrying on those when needed, but does not retry on unexpected 
retriable errors.


> Review client inconsistent error handling of OffsetFetch and OffsetCommit 
> responses
> ---
>
> Key: KAFKA-16032
> URL: https://issues.apache.org/jira/browse/KAFKA-16032
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to 
> review around this:
>  - Ensure that we keep the legacy behaviour of handling expected errors, and 
> failing with KafkaException for all unexpected errors (even if retriable). 
>  - Some errors are not handled similarly in both requests (ex. 
> COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not 
> OffsetFetch). Note that the specific errors handled by each request were kept 
> the same as in the legacy ConsumerCoordinator but this should be reviewed, in 
> an attempt to handle the same errors, in the same way, whenever possible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1450882720


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##
@@ -619,7 +622,21 @@ public void testAddRemoveSourceTask() {
 
 assertStatistics(worker, 0, 0);
 assertEquals(Collections.emptySet(), worker.taskIds());
-worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, 
anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
+
+Map connectorConfigs = anyConnectorConfigMap();
+ClusterConfigState configState = new ClusterConfigState(
+0,
+null,
+Collections.singletonMap(CONNECTOR_ID, 1),
+Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
+Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
+Collections.singletonMap(TASK_ID, origProps),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet()
+);
+assertTrue(worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, 
connectorConfigs, origProps, taskStatusListener, TargetState.STARTED));

Review Comment:
   Good catch, thanks 



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##
@@ -718,7 +747,22 @@ public void testAddRemoveExactlyOnceSourceTask() {
 
 assertStatistics(worker, 0, 0);
 assertEquals(Collections.emptySet(), worker.taskIds());
-worker.startExactlyOnceSourceTask(TASK_ID, ClusterConfigState.EMPTY,  
anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED, 
preProducer, postProducer);
+
+Map connectorConfigs = anyConnectorConfigMap();
+ClusterConfigState configState = new ClusterConfigState(
+0,
+null,
+Collections.singletonMap(CONNECTOR_ID, 1),
+Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
+Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
+Collections.singletonMap(TASK_ID, origProps),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet()
+);
+
+assertTrue(worker.startExactlyOnceSourceTask(TASK_ID, 
ClusterConfigState.EMPTY,  connectorConfigs, origProps, taskStatusListener, 
TargetState.STARTED, preProducer, postProducer));

Review Comment:
   Same as above



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -884,6 +890,158 @@ private void assertTimeoutException(Runnable operation, 
String expectedStageDesc
 connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
 }
 
+/**
+ * Tests the logic around enforcement of the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
+ * property and how it can be toggled via the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG 
tasks.max.enforce}
+ * property, following the test plain laid out in
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan;>KIP-1004.
+ */
+@Test
+public void testTasksMaxEnforcement() throws Exception {
+String configTopic = "tasks-max-enforcement-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(
+NUM_WORKERS,
+"Initial group of workers did not start in time."
+);
+
+Map connectorProps = 
defaultSourceConnectorProps(TOPIC_NAME);
+int maxTasks = 1;
+connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+int numTasks = 2;
+connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+// A connector that generates excessive tasks will be failed with an 
expected error message
+connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+CONNECTOR_NAME,
+0,
+"connector did not fail in time"
+);
+
+String expectedErrorSnippet = String.format(
+"The connector %s has generated %d tasks, which is greater 
than %d, "
++ "the maximum number of tasks it is configured to 
create. ",
+CONNECTOR_NAME,
+numTasks,
+maxTasks
+);
+String 

Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1450879034


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -884,6 +890,158 @@ private void assertTimeoutException(Runnable operation, 
String expectedStageDesc
 connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
 }
 
+/**
+ * Tests the logic around enforcement of the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
+ * property and how it can be toggled via the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG 
tasks.max.enforce}
+ * property, following the test plain laid out in
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan;>KIP-1004.
+ */
+@Test
+public void testTasksMaxEnforcement() throws Exception {
+String configTopic = "tasks-max-enforcement-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(
+NUM_WORKERS,
+"Initial group of workers did not start in time."
+);
+
+Map connectorProps = 
defaultSourceConnectorProps(TOPIC_NAME);
+int maxTasks = 1;
+connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+int numTasks = 2;
+connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+// A connector that generates excessive tasks will be failed with an 
expected error message
+connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+CONNECTOR_NAME,
+0,
+"connector did not fail in time"
+);
+
+String expectedErrorSnippet = String.format(
+"The connector %s has generated %d tasks, which is greater 
than %d, "
++ "the maximum number of tasks it is configured to 
create. ",
+CONNECTOR_NAME,
+numTasks,
+maxTasks
+);
+String errorMessage = 
connect.connectorStatus(CONNECTOR_NAME).connector().trace();
+assertThat(errorMessage, containsString(expectedErrorSnippet));
+
+// Stop all workers in the cluster
+connect.workers().forEach(connect::removeWorker);
+
+// Publish a set of too many task configs to the config topic, to 
simulate
+// an existing set of task configs that was written before the cluster 
was upgraded
+try (JsonConverter converter = new JsonConverter()) {
+converter.configure(
+
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"),
+false
+);
+
+for (int i = 0; i < numTasks; i++) {
+Map taskConfig = 
MonitorableSourceConnector.taskConfig(
+connectorProps,
+CONNECTOR_NAME,
+i
+);
+Struct wrappedTaskConfig = new 
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0)
+.put("properties", taskConfig);
+String key = KafkaConfigBackingStore.TASK_KEY(new 
ConnectorTaskId(CONNECTOR_NAME, i));
+byte[] value = converter.fromConnectData(
+configTopic,
+KafkaConfigBackingStore.TASK_CONFIGURATION_V0,
+wrappedTaskConfig
+);
+connect.kafka().produce(configTopic, key, new String(value));
+}
+
+Struct taskCommitMessage = new 
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0);
+taskCommitMessage.put("tasks", numTasks);
+String key = 
KafkaConfigBackingStore.COMMIT_TASKS_KEY(CONNECTOR_NAME);
+byte[] value = converter.fromConnectData(
+configTopic,
+KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0,
+taskCommitMessage
+);
+connect.kafka().produce(configTopic, key, new String(value));
+}
+
+// Restart all the workers in the cluster
+for (int i = 0; i < NUM_WORKERS; i++)
+connect.addWorker();
+
+// An existing set of tasks that exceeds the tasks.max property
+// will be failed with an expected error message
+connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+CONNECTOR_NAME,
+numTasks,
+"connector and tasks did not fail in time"
+);
+
+

Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450850940


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java:
##
@@ -44,6 +49,12 @@ public ErrantRecordSinkTask() {
 public void start(Map props) {
 super.start(props);
 reporter = context.errantRecordReporter();
+executorService = Executors.newSingleThreadExecutor();
+}
+
+@Override
+public void stop() {
+ThreadUtils.shutdownExecutorServiceQuietly(executorService, 4, 
TimeUnit.SECONDS);

Review Comment:
   Just curious--any rationale for four seconds specifically?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java:
##
@@ -75,9 +77,9 @@ public class ErrorHandlingIntegrationTest {
 private static final String DLQ_TOPIC = "my-connector-errors";
 private static final String CONNECTOR_NAME = "error-conn";
 private static final String TASK_ID = "error-conn-0";
-private static final int NUM_RECORDS_PRODUCED = 20;
-private static final int EXPECTED_CORRECT_RECORDS = 19;
+private static final int NUM_RECORDS_PRODUCED = 1000;
 private static final int EXPECTED_INCORRECT_RECORDS = 1;
+private static final int EXPECTED_CORRECT_RECORDS = NUM_RECORDS_PRODUCED - 
EXPECTED_INCORRECT_RECORDS;

Review Comment:
   Again, I know this isn't your fault, but I'm a little confused at the use of 
this field in the `testErrantRecordReporter` case. I believe it can be replaced 
with `NUM_RECORDS_PRODUCED` 
[here](https://github.com/gharris1727/kafka/blob/f5845038014f3df29d505eeadd01403e9756728f/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java#L220),
 since this case doesn't cover any errors that should prevent records from 
reaching tasks.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java:
##
@@ -54,7 +65,16 @@ public void put(Collection records) {
 .computeIfAbsent(rec.topic(), v -> new HashMap<>())
 .computeIfAbsent(rec.kafkaPartition(), v -> new 
TopicPartition(rec.topic(), rec.kafkaPartition()));
 committedOffsets.put(tp, committedOffsets.getOrDefault(tp, 0) 
+ 1);
-reporter.report(rec, new Throwable());
+Throwable error = new Throwable();
+// Test synchronous and asynchronous reporting, allowing for 
re-ordering the errant reports

Review Comment:
   We don't have any corresponding verification that this behavior is handled 
correctly. If there's an easy, lightweight way to add that, it'd be nice, but 
it's not worth blocking on if it's too cumbersome.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java:
##
@@ -245,7 +249,7 @@ public void testErrantRecordReporter() throws Exception {
 
 // consume failed records from dead letter queue topic
 log.info("Consuming records from test topic");
-ConsumerRecords messages = 
connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, 
DLQ_TOPIC);
+ConsumerRecords messages = 
connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, 
DLQ_TOPIC);

Review Comment:
   Nit (not your fault): we don't use the `messages` field at all.
   
   ```suggestion
   connect.kafka().consume(NUM_RECORDS_PRODUCED, 
CONSUME_MAX_DURATION_MS, DLQ_TOPIC);
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##
@@ -392,90 +397,29 @@ public void testSetConfigs() {
 }
 
 @Test
-public void testThreadSafety() throws Throwable {
-long runtimeMs = 5_000;
-int numThreads = 10;
-// Check that multiple threads using RetryWithToleranceOperator 
concurrently
-// can't corrupt the state of the ProcessingContext
-AtomicReference failed = new AtomicReference<>(null);
-RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(0,
-ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, 
errorHandlingMetrics, new ProcessingContext() {
-private final AtomicInteger count = new AtomicInteger();
-private final AtomicInteger attempt = new AtomicInteger();
-
-@Override
-public void error(Throwable error) {
-if (count.getAndIncrement() > 0) {
-failed.compareAndSet(null, new 
AssertionError("Concurrent call to error()"));
-}
-super.error(error);
-}
-
-@Override
-public Future report() {
-

Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1450863295


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The 
NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this 
topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in 
the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topicsThe set of topics and their 
corresponding first partition id to fetch.
+   * @param listenerName  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param maximumNumberOfPartitions The max number of partitions to 
return.
+   */
+  def getTopicMetadataForDescribeTopicResponse(
+topics: Seq[String],
+listenerName: ListenerName,
+firstTopicPartitionStartIndex: Int,
+maximumNumberOfPartitions: Int
+  ): DescribeTopicPartitionsResponseData = {
+val image = _currentImage
+var remaining = maximumNumberOfPartitions
+var startIndex = firstTopicPartitionStartIndex
+val result = new DescribeTopicPartitionsResponseData()
+topics.foreach { topicName =>
+  if (remaining > 0) {
+val partitionResponse = 
getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName)
+partitionResponse.map( partitions => {
+  val upperIndex = startIndex + remaining
+  val response = new DescribeTopicPartitionsResponseTopic()
+.setErrorCode(Errors.NONE.code)
+.setName(topicName)
+
.setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
+.setIsInternal(Topic.isInternal(topicName))
+.setPartitions(partitions.filter(partition => {
+  partition.partitionIndex() >= startIndex && 
partition.partitionIndex() < upperIndex
+}).asJava)
+  remaining -= response.partitions().size()
+  result.topics().add(response)
+
+  if (upperIndex < partitions.size) {
+result.setNextCursor(new Cursor()
+  .setTopicName(topicName)
+  .setPartitionIndex(upperIndex)
+)
+remaining = -1
+  }
+})
+
+// start index only applies to the first topic. Reset it here.
+startIndex = 0
+
+if (!partitionResponse.isDefined) {
+  val error = try {
+Topic.validate(topicName)
+Errors.UNKNOWN_TOPIC_OR_PARTITION

Review Comment:
   Updated, if fetch all topics, we ignore the topics with errors here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1450862714


##
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+MetadataCache metadataCache;
+AuthHelper authHelper;
+KafkaConfig config;
+
+public DescribeTopicPartitionsRequestHandler(
+MetadataCache metadataCache,
+AuthHelper authHelper,
+KafkaConfig config
+) {
+this.metadataCache = metadataCache;
+this.authHelper = authHelper;
+this.config = config;
+}
+
+public DescribeTopicPartitionsResponseData 
handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+if (metadataCache instanceof ZkMetadataCache) {
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request");
+}
+KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache) 
metadataCache;
+
+DescribeTopicPartitionsRequestData request = 
((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+Set topics = new HashSet<>();
+boolean fetchAllTopics = request.topics().isEmpty();
+DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+if (fetchAllTopics) {
+if (cursor != null) {
+// Includes the cursor topic in case the cursor topic does not 
exist anymore.
+topics.add(cursor.topicName());
+
kRaftMetadataCache.getAllTopicsAfterTopic(cursor.topicName()).foreach(topic -> 
topics.add(topic));
+} else {
+kRaftMetadataCache.getAllTopics().foreach(topic -> 
topics.add(topic));
+}
+} else {
+request.topics().forEach(topic -> {
+String topicName = topic.name();
+if (cursor == null || topicName.compareTo(cursor.topicName()) 
>= 0) {
+topics.add(topic.name());
+}
+});
+
+if (cursor != null && !topics.contains(cursor.topicName())) {
+// The topic in cursor must be included in the topic list if 
provided.
+throw new 
InvalidRequestException("DescribeTopicPartitionsRequest topic list should 
contain the cursor topic: " + cursor.topicName());
+}
+}
+
+// Do not disclose the existence of topics unauthorized for Describe, 
so we've not even checked if they exist or not
+Set 
unauthorizedForDescribeTopicMetadata = new HashSet<>();
+
+Iterator authorizedTopics = topics.stream().filter(topicName 
-> {
+boolean isAuthorized = authHelper.authorize(
+abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, 
true, 1);
+if (!fetchAllTopics && 

Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1450862538


##
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+MetadataCache metadataCache;
+AuthHelper authHelper;
+KafkaConfig config;
+
+public DescribeTopicPartitionsRequestHandler(
+MetadataCache metadataCache,
+AuthHelper authHelper,
+KafkaConfig config
+) {
+this.metadataCache = metadataCache;
+this.authHelper = authHelper;
+this.config = config;
+}
+
+public DescribeTopicPartitionsResponseData 
handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+if (metadataCache instanceof ZkMetadataCache) {
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request");
+}
+KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache) 
metadataCache;
+
+DescribeTopicPartitionsRequestData request = 
((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+Set topics = new HashSet<>();
+boolean fetchAllTopics = request.topics().isEmpty();
+DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+if (fetchAllTopics) {
+if (cursor != null) {
+// Includes the cursor topic in case the cursor topic does not 
exist anymore.
+topics.add(cursor.topicName());

Review Comment:
   Good catch, if the cursor topic is unauthorized, the first partition id is 
messed up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1450862006


##
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+MetadataCache metadataCache;
+AuthHelper authHelper;
+KafkaConfig config;
+
+public DescribeTopicPartitionsRequestHandler(
+MetadataCache metadataCache,
+AuthHelper authHelper,
+KafkaConfig config
+) {
+this.metadataCache = metadataCache;
+this.authHelper = authHelper;
+this.config = config;
+}
+
+public DescribeTopicPartitionsResponseData 
handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+if (metadataCache instanceof ZkMetadataCache) {
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request");
+}
+KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache) 
metadataCache;
+
+DescribeTopicPartitionsRequestData request = 
((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+Set topics = new HashSet<>();
+boolean fetchAllTopics = request.topics().isEmpty();
+DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+if (fetchAllTopics) {
+if (cursor != null) {
+// Includes the cursor topic in case the cursor topic does not 
exist anymore.
+topics.add(cursor.topicName());
+
kRaftMetadataCache.getAllTopicsAfterTopic(cursor.topicName()).foreach(topic -> 
topics.add(topic));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450846319


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -236,7 +237,7 @@ protected  V execAndHandleError(Operation operation, 
Class

Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450843684


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java:
##
@@ -121,20 +121,22 @@ public static DeadLetterQueueReporter 
createAndSetup(Map adminPr
 /**
  * Write the raw records into a Kafka topic and return the producer future.
  *
- * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
+ * @param context processing context containing the raw record at {@link 
ProcessingContext#original()}.
  * @return the future associated with the writing of this record; never 
null
  */
-public Future report(ProcessingContext context) {
+@SuppressWarnings("unchecked")
+public Future report(ProcessingContext context) {
 if (dlqTopicName.isEmpty()) {
 return CompletableFuture.completedFuture(null);
 }
 errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
-ConsumerRecord originalMessage = 
context.consumerRecord();
-if (originalMessage == null) {
+if (!(context.original() instanceof ConsumerRecord)) {

Review Comment:
   Ah, nice. I'll mark this resolved and look forward to the refactor 



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java:
##
@@ -121,20 +121,22 @@ public static DeadLetterQueueReporter 
createAndSetup(Map adminPr
 /**
  * Write the raw records into a Kafka topic and return the producer future.
  *
- * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
+ * @param context processing context containing the raw record at {@link 
ProcessingContext#original()}.
  * @return the future associated with the writing of this record; never 
null
  */
-public Future report(ProcessingContext context) {
+@SuppressWarnings("unchecked")
+public Future report(ProcessingContext context) {
 if (dlqTopicName.isEmpty()) {
 return CompletableFuture.completedFuture(null);
 }
 errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
-ConsumerRecord originalMessage = 
context.consumerRecord();
-if (originalMessage == null) {
+if (!(context.original() instanceof ConsumerRecord)) {
 errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
 return CompletableFuture.completedFuture(null);
 }
+ProcessingContext> sinkContext = 
(ProcessingContext>) context;

Review Comment:
   Hmmm, I was wondering if a consumer interceptor could potentially return a 
subclass of `ConsumerRecord` but this doesn't seem possible at the moment. Fine 
as-is, then 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450843020


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##
@@ -17,82 +17,36 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.errors.ConnectException;
-import 
org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture;
 import org.apache.kafka.connect.source.SourceRecord;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
 /**
- * Contains all the metadata related to the currently evaluating operation. 
Only one instance of this class is meant
- * to exist per task in a JVM.
+ * Contains all the metadata related to the currently evaluating operation, 
and associated with a particular
+ * sink or source record from the consumer or task, respectively. This class 
is not thread safe, and so once an
+ * instance is passed to a new thread, it should no longer be accessed by the 
previous thread.

Review Comment:
   Hmmm--I'm convinced we don't have to block on this, but I think there might 
still be value to making the mutable fields of the `ProcessingContext` class 
volatile to help reduce the possibility of footguns in the future.
   
   If a class has a volatile `ProcessingContext` field, that guarantees that 
reads of that field will always see the latest value for that field. It doesn't 
provide the same guarantees for the fields of the `ProcessingContext` instance, 
though--so while we'll always see an up-to-date reference to `volatile 
ProcessingContext ctx`, we may still get an out-of-date value when invoking 
`ctx.stage()`.
   
   This becomes possible if the same `ProcessingContext` instance is passed 
back and forth between threads, even if there is only ever one thread reading 
from or writing to the instance.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##
@@ -168,11 +102,13 @@ public String toString(boolean includeMessage) {
 builder.append("' with class '");
 builder.append(executingClass() == null ? "null" : 
executingClass().getName());
 builder.append('\'');
-if (includeMessage && sourceRecord() != null) {
+T original = original();
+if (includeMessage && original instanceof SourceRecord) {

Review Comment:
   Thank you for the exhaustive analysis 
   
   I wholeheartedly agree that this would be better suited for a follow-up, and 
if/when that happens, the comments you've left have created an excellent place 
to start from.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -303,48 +303,14 @@ public String toString() {
  * @param reporters the error reporters (should not be null).
  */
 public synchronized void reporters(List reporters) {
-this.context.reporters(reporters);
-}
-
-/**
- * Set the source record being processed in the connect pipeline.
- *
- * @param preTransformRecord the source record
- */
-public synchronized void sourceRecord(SourceRecord preTransformRecord) {
-this.context.sourceRecord(preTransformRecord);
-}
-
-/**
- * Set the record consumed from Kafka in a sink connector.
- *
- * @param consumedMessage the record
- */
-public synchronized void consumerRecord(ConsumerRecord 
consumedMessage) {
-this.context.consumerRecord(consumedMessage);
-}
-
-/**
- * @return true, if the last operation encountered an error; false 
otherwise
- */
-public synchronized boolean failed() {
-return this.context.failed();
-}
-
-/**
- * Returns the error encountered when processing the current stage.
- *
- * @return the error encountered when processing the current stage
- */
-public synchronized Throwable error() {
-return this.context.error();
+this.reporters = Objects.requireNonNull(reporters, "reporters");

Review Comment:
   Hmmm... I don't see the same confusion over ownership with a 
`CachedSupplier` since it's basically a drop-in for a `final` field that can't 
be instantiated before/during the constructor. But if this isn't how everyone 
reacts to a pattern like that, I agree it's best to try to avoid that confusion.
   
   I think the `AutoCloseableSupplier` interface is acceptable, if a little 
clunky. We also don't need to support 
`AutoCloseableSupplier>` if it's also viable to use a 
`List` instead.
   
   Some other possible 

[jira] [Resolved] (KAFKA-16112) Review JMX metrics in Async Consumer and determine the missing ones

2024-01-12 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16112.

Resolution: Fixed

These are the results of this ticket

 
|KAFKA-16113|
|KAFKA-16116|
|KAFKA-16115|

> Review JMX metrics in Async Consumer and determine the missing ones
> ---
>
> Key: KAFKA-16112
> URL: https://issues.apache.org/jira/browse/KAFKA-16112
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1450813789


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -884,6 +890,158 @@ private void assertTimeoutException(Runnable operation, 
String expectedStageDesc
 connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
 }
 
+/**
+ * Tests the logic around enforcement of the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
+ * property and how it can be toggled via the
+ * {@link 
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG 
tasks.max.enforce}
+ * property, following the test plain laid out in
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan;>KIP-1004.
+ */
+@Test
+public void testTasksMaxEnforcement() throws Exception {
+String configTopic = "tasks-max-enforcement-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(
+NUM_WORKERS,
+"Initial group of workers did not start in time."
+);
+
+Map connectorProps = 
defaultSourceConnectorProps(TOPIC_NAME);
+int maxTasks = 1;
+connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+int numTasks = 2;
+connectorProps.put(MonitorableSourceConnector.NUM_TASKS, 
Integer.toString(numTasks));
+connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+// A connector that generates excessive tasks will be failed with an 
expected error message
+connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+CONNECTOR_NAME,
+0,
+"connector did not fail in time"
+);
+
+String expectedErrorSnippet = String.format(
+"The connector %s has generated %d tasks, which is greater 
than %d, "
++ "the maximum number of tasks it is configured to 
create. ",
+CONNECTOR_NAME,
+numTasks,
+maxTasks
+);
+String errorMessage = 
connect.connectorStatus(CONNECTOR_NAME).connector().trace();
+assertThat(errorMessage, containsString(expectedErrorSnippet));
+
+// Stop all workers in the cluster
+connect.workers().forEach(connect::removeWorker);
+
+// Publish a set of too many task configs to the config topic, to 
simulate
+// an existing set of task configs that was written before the cluster 
was upgraded
+try (JsonConverter converter = new JsonConverter()) {
+converter.configure(
+
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"),
+false
+);
+
+for (int i = 0; i < numTasks; i++) {
+Map taskConfig = 
MonitorableSourceConnector.taskConfig(
+connectorProps,
+CONNECTOR_NAME,
+i
+);
+Struct wrappedTaskConfig = new 
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0)
+.put("properties", taskConfig);
+String key = KafkaConfigBackingStore.TASK_KEY(new 
ConnectorTaskId(CONNECTOR_NAME, i));
+byte[] value = converter.fromConnectData(
+configTopic,
+KafkaConfigBackingStore.TASK_CONFIGURATION_V0,
+wrappedTaskConfig
+);
+connect.kafka().produce(configTopic, key, new String(value));
+}
+
+Struct taskCommitMessage = new 
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0);
+taskCommitMessage.put("tasks", numTasks);
+String key = 
KafkaConfigBackingStore.COMMIT_TASKS_KEY(CONNECTOR_NAME);
+byte[] value = converter.fromConnectData(
+configTopic,
+KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0,
+taskCommitMessage
+);
+connect.kafka().produce(configTopic, key, new String(value));
+}
+
+// Restart all the workers in the cluster
+for (int i = 0; i < NUM_WORKERS; i++)
+connect.addWorker();
+
+// An existing set of tasks that exceeds the tasks.max property
+// will be failed with an expected error message
+connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+CONNECTOR_NAME,
+numTasks,
+"connector and tasks did not fail in time"
+);
+
+ 

Re: [PR] KAFKA-16087: Avoid data race which drops wrong record when errors.tolerance=all and asynchronous error occurs [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450812678


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##
@@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements 
AutoCloseable {
 private final ErrorHandlingMetrics errorHandlingMetrics;
 private final CountDownLatch stopRequestedLatch;
 private volatile boolean stopping;   // indicates whether the operator has 
been asked to stop retrying
-
-protected final ProcessingContext context;
+private List reporters;
 
 public RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
   ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics) {
-this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
+this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new CountDownLatch(1));
 }
 
 RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics,
-   ProcessingContext context, CountDownLatch 
stopRequestedLatch) {
+   CountDownLatch stopRequestedLatch) {
 this.errorRetryTimeout = errorRetryTimeout;
 this.errorMaxDelayInMillis = errorMaxDelayInMillis;
 this.errorToleranceType = toleranceType;
 this.time = time;
 this.errorHandlingMetrics = errorHandlingMetrics;
-this.context = context;
 this.stopRequestedLatch = stopRequestedLatch;
 this.stopping = false;
+this.reporters = Collections.emptyList();
 }
 
-public synchronized Future executeFailed(Stage stage, Class 
executingClass,
-  ConsumerRecord 
consumerRecord,
-  Throwable error) {
-
+public Future executeFailed(ProcessingContext context, Stage 
stage, Class executingClass, Throwable error) {
 markAsFailed();
-context.consumerRecord(consumerRecord);
 context.currentContext(stage, executingClass);
 context.error(error);
 errorHandlingMetrics.recordFailure();
-Future errantRecordFuture = context.report();
+Future errantRecordFuture = report(context);
 if (!withinToleranceLimits()) {
 errorHandlingMetrics.recordError();
 throw new ConnectException("Tolerance exceeded in error handler", 
error);
 }
 return errantRecordFuture;
 }
 
-public synchronized Future executeFailed(Stage stage, Class 
executingClass,
-   SourceRecord sourceRecord,
-   Throwable error) {

Review Comment:
   Excellent, thanks 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15834: Remove NamedTopologyIntegrationTest which leaks clients [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on PR #15185:
URL: https://github.com/apache/kafka/pull/15185#issuecomment-1889780346

   Hey @mjsax @ableegoldman thanks for the context on the Jira ticket. I 
removed just the leaky test, instead of the whole suite, but let me know if you 
think the whole suite should be eliminated.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15834: Remove NamedTopologyIntegrationTest which leaks clients [kafka]

2024-01-12 Thread via GitHub


gharris1727 opened a new pull request, #15185:
URL: https://github.com/apache/kafka/pull/15185

   This test leaks Kafka clients, because the underlying consumer gets stuck 
trying to read from a non-existent topic.
   
   Rather than change the client behavior in this situation, or change the way 
streams handles blocked threads, remove this test which causes the leak because 
the test itself is low-value.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15575: Begin enforcing 'tasks.max' property for connectors [kafka]

2024-01-12 Thread via GitHub


C0urante commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1450799667


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -391,7 +391,15 @@ public List> 
connectorTaskConfigs(String connName, Connector
 Connector connector = workerConnector.connector();
 try (LoaderSwap loaderSwap = 
plugins.withClassLoader(workerConnector.loader())) {
 String taskClassName = connector.taskClass().getName();
-for (Map taskProps : 
connector.taskConfigs(maxTasks)) {
+List> taskConfigs = 
connector.taskConfigs(maxTasks);
+try {
+checkTasksMax(connName, taskConfigs.size(), maxTasks, 
connConfig.enforceTasksMax());
+} catch (TooManyTasksException e) {
+// TODO: This control flow is awkward. Push task config 
generation into WorkerConnector class?

Review Comment:
   I think it makes sense as a follow-up, because the awkwardness of the 
control flow mostly stems from having to trigger a call to 
`WorkerConnector::onFailure` from a different thread than the one that `run` is 
being executed on. It'd probably make sense to tackle that and making the 
herder thread resilient against blocks in `Connector::taskConfigs` in the same 
PR.
   
   Does that sound alright?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on PR #13294:
URL: https://github.com/apache/kafka/pull/13294#issuecomment-1889769357

   Hey @yashmayya are you interested in reviewing this? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1450789672


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -434,23 +484,31 @@ public Block(Map props) {
 
 public Block(String block) {
 this.block = block;
-synchronized (Block.class) {
-if (blockLatch != null) {
-blockLatch.countDown();
+if (block != null) {
+synchronized (Block.class) {
+resetAwaitBlockLatch();
+awaitBlockLatch = new CountDownLatch(1);
+Block.class.notify();
 }
-blockLatch = new CountDownLatch(1);
 }
 }
 
 public void maybeBlockOn(String block) {
 if (block.equals(this.block)) {
 log.info("Will block on {}", block);
-blockLatch.countDown();
+CountDownLatch blockLatch;
+synchronized (Block.class) {
+awaitBlockLatch.countDown();

Review Comment:
   nit: small NPE here under this sequence of calls:
   
   1. new Block(s)
   2. Block.reset()
   3. block.maybeBlockOn(s)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14345: Fix flakiness with more accurate bound in (Dynamic)ConnectionQuotaTest [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on PR #12806:
URL: https://github.com/apache/kafka/pull/12806#issuecomment-1889743364

   Hi @divijvaidya As this is still causing flaky failures ~2% of the time, I'm 
still interested in getting this fix merged. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] test [kafka]

2024-01-12 Thread via GitHub


dajac closed pull request #15058: test
URL: https://github.com/apache/kafka/pull/15058


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-12 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1450765739


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -141,17 +144,33 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Return topic partition metadata for the given topic, listener and index 
range. Also, return a boolean value to
+   * indicate whether there are more partitions with index equal or larger 
than the upper index.
+   *
+   * @param image   The metadata image
+   * @param topicName   The name of the topic.
+   * @param listenerNameThe listener name.
+   * @param startIndex  The smallest index of the partitions 
to be included in the result.
+   * @param upperIndex  The upper limit of the index of the 
partitions to be included in the result.
+   *Note that, the upper index can be 
larger than the largest partition index in
+   *this topic.
+   * @returnA collection of topic partition 
metadata and whether there are more partitions.
+   */
   private def getPartitionMetadataForDescribeTopicResponse(
 image: MetadataImage,
 topicName: String,
-listenerName: ListenerName
-  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+listenerName: ListenerName,
+startIndex: Int,
+upperIndex: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Boolean) = {
 Option(image.topics().getTopic(topicName)) match {
-  case None => None
+  case None => (None, false)
   case Some(topic) => {
-val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
-  val partitionId = entry.getKey
-  val partition = entry.getValue
+val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
+val endIndex = upperIndex.min(topic.partitions().size())
+for (partitionId <- startIndex until endIndex) {
+  val partition = topic.partitions().get(partitionId)

Review Comment:
   Do you mean the partitions in the topic are not consecutive? Just realize it 
is possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15945 : Flaky test - testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #14893:
URL: https://github.com/apache/kafka/pull/14893#discussion_r1450758420


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -101,8 +101,9 @@
  */
 @Tag("integration")
 public class MirrorConnectorsIntegrationBaseTest {
+private static final Object TOPIC_LOCK = new Object();

Review Comment:
   Can you explain what this lock is accomplishing? There's only one test 
running at a time within the suite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16118; Coordinator unloading fails when replica is deleted [kafka]

2024-01-12 Thread via GitHub


jolshan commented on code in PR #15182:
URL: https://github.com/apache/kafka/pull/15182#discussion_r1450751697


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -684,6 +685,60 @@ public void testScheduleUnloading() {
 // Getting the coordinator context fails because it no longer exists.
 assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
 }
+
+@Test
+public void testScheduleUnloadingWithEmptyEpoch() {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+
+// Loads the coordinator. It directly transitions to active.
+runtime.scheduleLoadOperation(TP, 10);

Review Comment:
   There are two cases for unloading consumer offsets partition (so the 
coordinator is no longer the leader)
   
   1. the replica moves and epoch is empty
   2. the replica becomes a follower and epoch is nonzero
   
   before we were only handling 2 correctly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16118; Coordinator unloading fails when replica is deleted [kafka]

2024-01-12 Thread via GitHub


jolshan commented on code in PR #15182:
URL: https://github.com/apache/kafka/pull/15182#discussion_r1450749432


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -684,6 +685,60 @@ public void testScheduleUnloading() {
 // Getting the coordinator context fails because it no longer exists.
 assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
 }
+
+@Test
+public void testScheduleUnloadingWithEmptyEpoch() {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+
+// Loads the coordinator. It directly transitions to active.
+runtime.scheduleLoadOperation(TP, 10);

Review Comment:
   I misunderstood previously. I have discussed with David offline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16120, KAFKA-16121: Handle removed replicas during KRaft migration [kafka]

2024-01-12 Thread via GitHub


splett2 commented on code in PR #15184:
URL: https://github.com/apache/kafka/pull/15184#discussion_r1450726015


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -677,6 +677,88 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 4, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testPartitionReassignmentInDualWrite(zkCluster: ClusterInstance): Unit = 
{
+// Create a topic in ZK mode
+val topicName = "test"
+var admin = zkCluster.createAdminClient()
+val zkClient = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+
+// Bootstrap the ZK cluster ID into KRaft
+val clusterId = zkCluster.clusterId()
+val kraftCluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+setClusterId(Uuid.fromString(clusterId)).
+setNumBrokerNodes(0).
+setNumControllerNodes(1).build())
+  .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+  .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+  .build()
+try {
+  kraftCluster.format()
+  kraftCluster.startup()
+  val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+  // Enable migration configs and restart brokers
+  log.info("Restart brokers in migration mode")
+  
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+  
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
+  
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+  
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+  zkCluster.rollingBrokerRestart()
+  zkCluster.waitForReadyBrokers()
+  readyFuture.get(30, TimeUnit.SECONDS)
+
+  // Wait for migration to begin
+  log.info("Waiting for ZK migration to begin")
+  TestUtils.waitUntilTrue(
+() => zkClient.getControllerId.contains(3000),
+"Timed out waiting for KRaft controller to take over",
+3)
+
+  // Create a topic with replicas on brokers 0, 1, 2
+  log.info("Create new topic with AdminClient")
+  admin = zkCluster.createAdminClient()
+  val newTopics = new util.ArrayList[NewTopic]()
+  val replicaAssignment = Collections.singletonMap(Integer.valueOf(0), 
Seq(0, 1, 2).map(int2Integer).asJava)
+  newTopics.add(new NewTopic(topicName, replicaAssignment))
+  val createTopicResult = admin.createTopics(newTopics)
+  createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+  val topicPartition = new TopicPartition(topicName, 0)
+
+  // Verify the changes made to KRaft are seen in ZK
+  verifyTopicPartitionMetadata(topicName, Seq(topicPartition), zkClient)
+
+  // Reassign replicas to brokers 1, 2, 3 and wait for reassignment to 
complete
+  
admin.alterPartitionReassignments(Collections.singletonMap(topicPartition,
+Optional.of(new NewPartitionReassignment(Seq(1, 2, 
3).map(int2Integer).asJava.all().get()
+
+  admin.electLeaders(ElectionType.PREFERRED, 
Collections.singleton(topicPartition)).all.get()

Review Comment:
   oddly, the reassignment times out if I do not run leader election. I took a 
look at the `state.change.logger` logs and I saw some messages around the 
reassignment-generated LISR being ignored due to the broker having the same 
leader epoch already.
   
   I suspect this is because the leader epoch bump logic in KRaft is different 
from the leader epoch bump logic in ZK (and the handling for LISR and topic 
deltas is different in the ReplicaManager). Something along the lines of:
   
   In KRaft mode, when we start a reassignment we don't bump the leader epoch. 
In ZK mode, when we add new replicas we _do_ bump the leader epoch. When we 
handle an LISR, we ignore any LISR with the current leader epoch, so that 
results in the KRaft reassignment initiation LISR being ignored.
   
   Filed as KAFKA-16121, but I included the fix in this PR since it is covered 
by the same test.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL 

[jira] [Created] (KAFKA-16121) Partition reassignments in ZK migration dual write mode stalled until leader epoch incremented

2024-01-12 Thread David Mao (Jira)
David Mao created KAFKA-16121:
-

 Summary: Partition reassignments in ZK migration dual write mode 
stalled until leader epoch incremented
 Key: KAFKA-16121
 URL: https://issues.apache.org/jira/browse/KAFKA-16121
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


I noticed this in an integration test in 
https://github.com/apache/kafka/pull/15184

In ZK mode, partition leaders rely on the LeaderAndIsr request to be notified 
of new replicas as part of a reassignment. In ZK mode, we ignore any 
LeaderAndIsr request where the partition leader epoch is less than or equal to 
the current partition leader epoch.

In KRaft mode, we do not bump the leader epoch when starting a new 
reassignment, see: `triggerLeaderEpochBumpIfNeeded`. This means that the leader 
will ignore the LISR request initiating the reassignment until a leader epoch 
bump is triggered through another means, for instance preferred leader election.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16118; Coordinator unloading fails when replica is deleted [kafka]

2024-01-12 Thread via GitHub


dajac commented on code in PR #15182:
URL: https://github.com/apache/kafka/pull/15182#discussion_r1450733355


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -684,6 +685,60 @@ public void testScheduleUnloading() {
 // Getting the coordinator context fails because it no longer exists.
 assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
 }
+
+@Test
+public void testScheduleUnloadingWithEmptyEpoch() {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+
+// Loads the coordinator. It directly transitions to active.
+runtime.scheduleLoadOperation(TP, 10);

Review Comment:
   the state remains forever in the group coordinator if you were asking about 
the group coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16120: Handle removed replicas during KRaft migration [kafka]

2024-01-12 Thread via GitHub


splett2 commented on code in PR #15184:
URL: https://github.com/apache/kafka/pull/15184#discussion_r1450726015


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -677,6 +677,88 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 4, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testPartitionReassignmentInDualWrite(zkCluster: ClusterInstance): Unit = 
{
+// Create a topic in ZK mode
+val topicName = "test"
+var admin = zkCluster.createAdminClient()
+val zkClient = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+
+// Bootstrap the ZK cluster ID into KRaft
+val clusterId = zkCluster.clusterId()
+val kraftCluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+setClusterId(Uuid.fromString(clusterId)).
+setNumBrokerNodes(0).
+setNumControllerNodes(1).build())
+  .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+  .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+  .build()
+try {
+  kraftCluster.format()
+  kraftCluster.startup()
+  val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+  // Enable migration configs and restart brokers
+  log.info("Restart brokers in migration mode")
+  
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+  
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
+  
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+  
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+  zkCluster.rollingBrokerRestart()
+  zkCluster.waitForReadyBrokers()
+  readyFuture.get(30, TimeUnit.SECONDS)
+
+  // Wait for migration to begin
+  log.info("Waiting for ZK migration to begin")
+  TestUtils.waitUntilTrue(
+() => zkClient.getControllerId.contains(3000),
+"Timed out waiting for KRaft controller to take over",
+3)
+
+  // Create a topic with replicas on brokers 0, 1, 2
+  log.info("Create new topic with AdminClient")
+  admin = zkCluster.createAdminClient()
+  val newTopics = new util.ArrayList[NewTopic]()
+  val replicaAssignment = Collections.singletonMap(Integer.valueOf(0), 
Seq(0, 1, 2).map(int2Integer).asJava)
+  newTopics.add(new NewTopic(topicName, replicaAssignment))
+  val createTopicResult = admin.createTopics(newTopics)
+  createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+  val topicPartition = new TopicPartition(topicName, 0)
+
+  // Verify the changes made to KRaft are seen in ZK
+  verifyTopicPartitionMetadata(topicName, Seq(topicPartition), zkClient)
+
+  // Reassign replicas to brokers 1, 2, 3 and wait for reassignment to 
complete
+  
admin.alterPartitionReassignments(Collections.singletonMap(topicPartition,
+Optional.of(new NewPartitionReassignment(Seq(1, 2, 
3).map(int2Integer).asJava.all().get()
+
+  admin.electLeaders(ElectionType.PREFERRED, 
Collections.singleton(topicPartition)).all.get()

Review Comment:
   oddly, the reassignment times out if I do not run leader election. I took a 
look at the `state.change.logger` logs and I saw some messages around the 
reassignment-generated LISR being ignored due to the broker having the same 
leader epoch already.
   
   I suspect this is because the leader epoch bump logic in KRaft is different 
from the leader epoch bump logic in ZK (and the handling for LISR and topic 
deltas is different in the ReplicaManager). Something along the lines of:
   
   In KRaft mode, when we start a reassignment we don't bump the leader epoch. 
In ZK mode, when we add new replicas we _do_ bump the leader epoch. When we 
handle an LISR, we ignore any LISR with the current leader epoch, so that 
results in the KRaft reassignment initiation LISR being ignored.
   
   I will file a separate JIRA once I get to the bottom of it.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[jira] [Commented] (KAFKA-16120) Partition reassignments in ZK migration dual write leaves stray partitions

2024-01-12 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806129#comment-17806129
 ] 

Justine Olshan commented on KAFKA-16120:


Do you know if https://issues.apache.org/jira/browse/KAFKA-14616 is related?

> Partition reassignments in ZK migration dual write leaves stray partitions
> --
>
> Key: KAFKA-16120
> URL: https://issues.apache.org/jira/browse/KAFKA-16120
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Priority: Major
>
> When a reassignment is completed in ZK migration dual-write mode, the 
> `StopReplica` sent by the kraft quorum migration propagator is sent with 
> `delete = false` for deleted replicas when processing the topic delta. This 
> results in stray replicas.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16120) Partition reassignments in ZK migration dual write leaves stray partitions

2024-01-12 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806129#comment-17806129
 ] 

Justine Olshan edited comment on KAFKA-16120 at 1/12/24 5:18 PM:
-

Do you know if https://issues.apache.org/jira/browse/KAFKA-14616 is related?

 

Oh – I see this specifically for migration and I believe the other one is not.


was (Author: jolshan):
Do you know if https://issues.apache.org/jira/browse/KAFKA-14616 is related?

> Partition reassignments in ZK migration dual write leaves stray partitions
> --
>
> Key: KAFKA-16120
> URL: https://issues.apache.org/jira/browse/KAFKA-16120
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Priority: Major
>
> When a reassignment is completed in ZK migration dual-write mode, the 
> `StopReplica` sent by the kraft quorum migration propagator is sent with 
> `delete = false` for deleted replicas when processing the topic delta. This 
> results in stray replicas.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16120: Handle removed replicas during KRaft migration [kafka]

2024-01-12 Thread via GitHub


splett2 commented on code in PR #15184:
URL: https://github.com/apache/kafka/pull/15184#discussion_r1450726015


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -677,6 +677,88 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 4, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testPartitionReassignmentInDualWrite(zkCluster: ClusterInstance): Unit = 
{
+// Create a topic in ZK mode
+val topicName = "test"
+var admin = zkCluster.createAdminClient()
+val zkClient = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+
+// Bootstrap the ZK cluster ID into KRaft
+val clusterId = zkCluster.clusterId()
+val kraftCluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+setClusterId(Uuid.fromString(clusterId)).
+setNumBrokerNodes(0).
+setNumControllerNodes(1).build())
+  .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+  .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+  .build()
+try {
+  kraftCluster.format()
+  kraftCluster.startup()
+  val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+  // Enable migration configs and restart brokers
+  log.info("Restart brokers in migration mode")
+  
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+  
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
+  
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+  
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+  zkCluster.rollingBrokerRestart()
+  zkCluster.waitForReadyBrokers()
+  readyFuture.get(30, TimeUnit.SECONDS)
+
+  // Wait for migration to begin
+  log.info("Waiting for ZK migration to begin")
+  TestUtils.waitUntilTrue(
+() => zkClient.getControllerId.contains(3000),
+"Timed out waiting for KRaft controller to take over",
+3)
+
+  // Create a topic with replicas on brokers 0, 1, 2
+  log.info("Create new topic with AdminClient")
+  admin = zkCluster.createAdminClient()
+  val newTopics = new util.ArrayList[NewTopic]()
+  val replicaAssignment = Collections.singletonMap(Integer.valueOf(0), 
Seq(0, 1, 2).map(int2Integer).asJava)
+  newTopics.add(new NewTopic(topicName, replicaAssignment))
+  val createTopicResult = admin.createTopics(newTopics)
+  createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+  val topicPartition = new TopicPartition(topicName, 0)
+
+  // Verify the changes made to KRaft are seen in ZK
+  verifyTopicPartitionMetadata(topicName, Seq(topicPartition), zkClient)
+
+  // Reassign replicas to brokers 1, 2, 3 and wait for reassignment to 
complete
+  
admin.alterPartitionReassignments(Collections.singletonMap(topicPartition,
+Optional.of(new NewPartitionReassignment(Seq(1, 2, 
3).map(int2Integer).asJava.all().get()
+
+  admin.electLeaders(ElectionType.PREFERRED, 
Collections.singleton(topicPartition)).all.get()

Review Comment:
   oddly, the reassignment times out if I do not run leader election. I took a 
look at the `state.change.logger` logs and I saw some messages around the 
reassignment-generated LISR being ignored due to the broker having the same 
leader epoch already.
   
   I suspect this is because the leader epoch bump logic in KRaft is different 
from the leader epoch bump logic in ZK (and the handling for LISR and topic 
deltas is different in the ReplicaManager). I will file a separate JIRA once I 
get to the bottom of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16118; Coordinator unloading fails when replica is deleted [kafka]

2024-01-12 Thread via GitHub


jolshan commented on code in PR #15182:
URL: https://github.com/apache/kafka/pull/15182#discussion_r1450722995


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -684,6 +685,60 @@ public void testScheduleUnloading() {
 // Getting the coordinator context fails because it no longer exists.
 assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
 }
+
+@Test
+public void testScheduleUnloadingWithEmptyEpoch() {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+
+// Loads the coordinator. It directly transitions to active.
+runtime.scheduleLoadOperation(TP, 10);

Review Comment:
   Is there an amount of time it takes before we clear the deleted partition 
state? Or do we keep all deleted partitions until the offsets topic is 
reassigned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16120) Partition reassignments in ZK migration dual write leaves stray partitions

2024-01-12 Thread David Mao (Jira)
David Mao created KAFKA-16120:
-

 Summary: Partition reassignments in ZK migration dual write leaves 
stray partitions
 Key: KAFKA-16120
 URL: https://issues.apache.org/jira/browse/KAFKA-16120
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


When a reassignment is completed in ZK migration dual-write mode, the 
`StopReplica` sent by the kraft quorum migration propagator is sent with 
`delete = false` for deleted replicas when processing the topic delta. This 
results in stray replicas.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16118; Coordinator unloading fails when replica is deleted [kafka]

2024-01-12 Thread via GitHub


jolshan commented on code in PR #15182:
URL: https://github.com/apache/kafka/pull/15182#discussion_r1450721942


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1683,7 +1685,7 @@ public void scheduleUnloadOperation(
 if (context != null) {
 try {
 context.lock.lock();
-if (context.epoch < partitionEpoch) {
+if (!partitionEpoch.isPresent() || context.epoch < 
partitionEpoch.getAsInt()) {

Review Comment:
   Ok -- I guess it is similar to logging -1, I guess folks can assume this is 
a deleted topic and not an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] improve TopicCommandIntegrationTest to be less flaky [kafka]

2024-01-12 Thread via GitHub


jolshan commented on PR #14891:
URL: https://github.com/apache/kafka/pull/14891#issuecomment-1889665664

   > Thanks for your feedback! Do you mean we should keep the existing API 
calls like createAndWaitTopic(buildTopicCommandOptionsWithBootstrap( 
"--create", "--partitions", "2", "--replication-factor", "1", "--topic", 
testTopicName)); ?
   
   Yes -- i think we should keep the commands


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #15080:
URL: https://github.com/apache/kafka/pull/15080#discussion_r1450714324


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -1001,6 +1001,65 @@ public void testResetConnectorOffsets() throws Exception 
{
 assertEquals(msg, resetOffsetsCallback.get(1000, 
TimeUnit.MILLISECONDS));
 }
 
+@Test()
+public void testRequestTaskReconfigurationDoesNotDeadlock() throws 
Exception {
+connector = mock(BogusSourceConnector.class);
+expectAdd(SourceSink.SOURCE);
+
+// Start the connector
+Map config = connectorConfig(SourceSink.SOURCE);
+Connector connectorMock = mock(SourceConnector.class);
+expectConfigValidation(connectorMock, true, config);
+
+herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+
+// Wait on connector to start
+Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
+assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+
+// Prepare for task config update
+
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+expectStop();
+
+// Prepare for connector and task config update
+Map newConfig = connectorConfig(SourceSink.SOURCE);
+newConfig.put("dummy-connector-property", "yes");
+final ArgumentCaptor> onStart = 
ArgumentCaptor.forClass(Callback.class);
+doAnswer(invocation -> {
+onStart.getValue().onCompletion(null, TargetState.STARTED);
+return true;
+}).when(worker).startConnector(eq(CONNECTOR_NAME), eq(newConfig), 
any(HerderConnectorContext.class),
+eq(herder), eq(TargetState.STARTED), onStart.capture());
+
+// Common invocations
+when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 
0)), any(), any(), any(), eq(herder), 
eq(TargetState.STARTED))).thenReturn(true);
+Map updatedTaskConfig1 = taskConfig(SourceSink.SOURCE);
+updatedTaskConfig1.put("dummy-task-property", "1");
+Map updatedTaskConfig2 = taskConfig(SourceSink.SOURCE);
+updatedTaskConfig2.put("dummy-task-property", "2");
+when(worker.connectorTaskConfigs(eq(CONNECTOR_NAME), any()))
+.thenReturn(
+Collections.singletonList(updatedTaskConfig1),
+Collections.singletonList(updatedTaskConfig2));
+
+// Set new config on the connector and tasks
+FutureCallback> reconfigureCallback = 
new FutureCallback<>();
+expectConfigValidation(connectorMock, false, newConfig);
+herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, 
reconfigureCallback);
+
+Thread.sleep(10);
+

Review Comment:
   Does this make the deadlock more likely on your machine? I can't seem to 
reproduce it with or without this sleep.
   Since it's going to be highly dependent on the machine, I think we should 
just eliminate this sleep.
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >