[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2023-07-19 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744877#comment-17744877
 ] 

Tom Lee commented on KAFKA-10337:
-

Thanks for pushing this through [~erikvanoosten], nice to see this finally 
land. :)

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Assignee: Erik van Oosten
>Priority: Major
> Fix For: 3.6.0
>
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ivanyu commented on pull request #13984: KAFKA-15107: Support custom metadata for remote log segment

2023-07-19 Thread via GitHub


ivanyu commented on PR #13984:
URL: https://github.com/apache/kafka/pull/13984#issuecomment-1643169441

   Conflicts resolved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-19 Thread via GitHub


jeffkbkim opened a new pull request, #14056:
URL: https://github.com/apache/kafka/pull/14056

   Built on top of https://github.com/apache/kafka/pull/14017, this PR 
implements the existing Heartbeat API in the new Group Coordinator.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


satishd commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1268911909


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   Thanks @kamalcph for the detailed explanation. It is better to return the 
epoch along with highest-offset and then check the minimum of the respective 
end offset in the local leader epoch chain for that epoch and the highest 
offset uploaded to remote storage for that epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on a diff in pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.

2023-07-19 Thread via GitHub


splett2 commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1268889660


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -858,13 +858,28 @@ class Partition(val topicPartition: TopicPartition,
 // No need to calculate low watermark if there is no delayed 
DeleteRecordsRequest
 val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) 
lowWatermarkIfLeader else -1L
 val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset
-replica.updateFetchState(
-  followerFetchOffsetMetadata,
-  followerStartOffset,
-  followerFetchTimeMs,
-  leaderEndOffset,
-  brokerEpoch
-)
+
+// Acquire the lock for the fetch state update. A race can happen between 
fetch requests from a rebooted broker.
+// The requests before and after the reboot can carry different fetch 
metadata especially offsets and broker epoch.
+// It can particularly affect the ISR expansion where we decide to expand 
based on stale fetch request but use the
+// latest broker epoch to fill in the AlterPartition request.
+inReadLock(leaderIsrUpdateLock) {
+  // Fence the fetch request with stale broker epoch from a rebooted 
follower.
+  val currentBrokerEpoch = replica.stateSnapshot.brokerEpoch.getOrElse(-1L)
+  if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) {
+error(s"Received fetch request for $topicPartition with stale broker 
epoch=$brokerEpoch. The expected" +
+  s" broker epoch= $currentBrokerEpoch.\"")
+return
+  }

Review Comment:
   my understanding is that the epoch check needs to be done in the 
`updateFetchState` call under the atomic `updateAndGet`.
   
   I don't think it is worth error logging here. Error logs usually indicate an 
unexpected failure that may require intervention. If we are silently fencing a 
stale fetch, a log is probably not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on a diff in pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.

2023-07-19 Thread via GitHub


splett2 commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1268889660


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -858,13 +858,28 @@ class Partition(val topicPartition: TopicPartition,
 // No need to calculate low watermark if there is no delayed 
DeleteRecordsRequest
 val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) 
lowWatermarkIfLeader else -1L
 val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset
-replica.updateFetchState(
-  followerFetchOffsetMetadata,
-  followerStartOffset,
-  followerFetchTimeMs,
-  leaderEndOffset,
-  brokerEpoch
-)
+
+// Acquire the lock for the fetch state update. A race can happen between 
fetch requests from a rebooted broker.
+// The requests before and after the reboot can carry different fetch 
metadata especially offsets and broker epoch.
+// It can particularly affect the ISR expansion where we decide to expand 
based on stale fetch request but use the
+// latest broker epoch to fill in the AlterPartition request.
+inReadLock(leaderIsrUpdateLock) {
+  // Fence the fetch request with stale broker epoch from a rebooted 
follower.
+  val currentBrokerEpoch = replica.stateSnapshot.brokerEpoch.getOrElse(-1L)
+  if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) {
+error(s"Received fetch request for $topicPartition with stale broker 
epoch=$brokerEpoch. The expected" +
+  s" broker epoch= $currentBrokerEpoch.\"")
+return
+  }

Review Comment:
   my understanding is that the epoch check needs to be done in the 
`updateFetchState` call under the atomic `updateAndGet`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13999: KAFKA-15176: add tests for tiered storage metrics

2023-07-19 Thread via GitHub


showuon commented on code in PR #13999:
URL: https://github.com/apache/kafka/pull/13999#discussion_r1268886912


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3379,6 +3396,246 @@ class ReplicaManagerTest {
 testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, 
Errors.NONE)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(booleans = Array(true, false))
+  def testOffsetOutOfRangeExceptionWhenReadFromLog(isFromFollower: Boolean): 
Unit = {
+val replicaId = if (isFromFollower) 1 else -1
+val tp0 = new TopicPartition(topic, 0)
+val tidp0 = new TopicIdPartition(topicId, tp0)
+// create a replicaManager with remoteLog enabled
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true)
+try {
+  val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+  replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+  val partition0Replicas = Seq[Integer](0, 1).asJava
+  val topicIds = Map(tp0.topic -> topicId).asJava
+  val leaderEpoch = 0
+  val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(
+  new LeaderAndIsrPartitionState()
+.setTopicName(tp0.topic)
+.setPartitionIndex(tp0.partition)
+.setControllerEpoch(0)
+.setLeader(leaderEpoch)
+.setLeaderEpoch(0)
+.setIsr(partition0Replicas)
+.setPartitionEpoch(0)
+.setReplicas(partition0Replicas)
+.setIsNew(true)
+).asJava,
+topicIds,
+Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+  replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+  val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 0, 100, FetchIsolation.LOG_END, None.asJava)
+  // when reading log, it'll throw OffsetOutOfRangeException, which will 
be handled separately
+  val result = replicaManager.readFromLog(params, Seq(tidp0 -> new 
PartitionData(topicId, 1, 0, 10, Optional.of[Integer](leaderEpoch), 
Optional.of[Integer](leaderEpoch))), UnboundedQuota, false)
+
+  if (isFromFollower) {
+// expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from 
follower, since the data is already available in remote log
+assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, 
result.head._2.error)
+  } else {
+assertEquals(Errors.NONE, result.head._2.error)
+  }
+  assertEquals(startOffset, result.head._2.leaderLogStartOffset)
+  assertEquals(endOffset, result.head._2.leaderLogEndOffset)
+  assertEquals(highHW, result.head._2.highWatermark)
+  if (isFromFollower) {
+assertFalse(result.head._2.info.delayedRemoteStorageFetch.isPresent)
+  } else {
+// for consumer fetch, we should return a delayedRemoteStorageFetch to 
wait for remote fetch
+assertTrue(result.head._2.info.delayedRemoteStorageFetch.isPresent)
+  }
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(booleans = Array(true, false))
+  def testOffsetOutOfRangeExceptionWhenFetchMessages(isFromFollower: Boolean): 
Unit = {
+val replicaId = if (isFromFollower) 1 else -1
+val tp0 = new TopicPartition(topic, 0)
+val tidp0 = new TopicIdPartition(topicId, tp0)
+// create a replicaManager with remoteLog enabled
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog= true)
+try {
+  val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+  replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+  val partition0Replicas = Seq[Integer](0, 1).asJava
+  val topicIds = Map(tp0.topic -> topicId).asJava
+  val leaderEpoch = 0
+  val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(
+  new LeaderAndIsrPartitionState()
+.setTopicName(tp0.topic)
+.setPartitionIndex(tp0.partition)
+.setControllerEpoch(0)
+.setLeader(leaderEpoch)
+.setLeaderEpoch(0)
+.setIsr(partition0Replicas)
+.setPartitionEpoch(0)
+.setReplicas(partition0Replicas)
+.setIsNew(true)
+).asJava,
+topicIds,
+Set(new Node(0, "host1", 0), new Node(1, "host2", 

[GitHub] [kafka] showuon commented on a diff in pull request #14051: KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently

2023-07-19 Thread via GitHub


showuon commented on code in PR #14051:
URL: https://github.com/apache/kafka/pull/14051#discussion_r1268862349


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -998,7 +998,13 @@ class Partition(val topicPartition: TopicPartition,
   // 3. Its metadata cached broker epoch matches its Fetch request broker 
epoch. Or the Fetch
   //request broker epoch is -1 which bypasses the epoch verification.
   case kRaftMetadataCache: KRaftMetadataCache =>
-val storedBrokerEpoch = 
remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpoch
+val mayBeReplica = getReplica(followerReplicaId)
+// The topic is already deleted and we don't have any replica 
information. In this case, we can return false
+// so as to avoid NPE
+if (mayBeReplica.isEmpty) {
+  return false

Review Comment:
   We should log something here. maybe:
   `warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist 
in the leader node. It might because the topic is already deleted.")`
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14045: refactor(storage): topic-based RLMM consumer-manager/task related improvements

2023-07-19 Thread via GitHub


showuon commented on code in PR #14045:
URL: https://github.com/apache/kafka/pull/14045#discussion_r1268858531


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##
@@ -87,36 +96,41 @@ public void waitTillConsumptionCatchesUp(RecordMetadata 
recordMetadata) throws T
 }
 
 /**
- * Waits if necessary for the consumption to reach the offset of the given 
{@code recordMetadata}.
+ * Waits if necessary for the consumption to reach the {@code offset} of 
the given record
+ * at a certain {@code partition} of the metadata topic.

Review Comment:
   This change should be reverted since the parameter is not updated.



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##
@@ -182,13 +182,12 @@ private CompletableFuture 
storeRemoteLogMetadata(TopicIdPartition topicIdP
 CompletableFuture produceFuture = 
producerManager.publishMessage(remoteLogMetadata);
 
 // Create and return a `CompletableFuture` instance which 
completes when the consumer is caught up with the produced record's offset.
-return produceFuture.thenApplyAsync(recordMetadata -> {
+return produceFuture.thenAcceptAsync(recordMetadata -> {
 try {
-
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
+
consumerManager.waitTillConsumptionCatchesUp(recordMetadata.partition(), 
recordMetadata.offset());

Review Comment:
   We should pass `recordMetadata` only, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp commented on pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1

2023-07-19 Thread via GitHub


bmscomp commented on PR #14032:
URL: https://github.com/apache/kafka/pull/14032#issuecomment-1642924837

   @divijvaidya  In the `retry_zinc` file bash script in the kafka root 
directory there is something about this issue but related to an older version 
of gradle 
   
   ```
   
   # Hacky workaround for https://github.com/gradle/gradle/issues/3777
   # There is currently no configurable timeout, so we retry builds jenkins 
when we can't get a lock on the zinc compiler cache
   # Hopefully we can remove this in the future, but this will save us from 
having to manually rebuild for the time being.
   # Example:
   # [2021-10-19T17:25:07.234Z] * What went wrong:
   # [2021-10-19T17:25:07.234Z] Execution failed for task 
':streams:streams-scala:compileScala'.
   # [2021-10-19T17:25:07.234Z] > Timeout waiting to lock zinc-1.3.5_2.13.6_8 
compiler cache (/home/jenkins/.gradle/caches/7.0.2/zinc-1.3.5_2.13.6_8). It is 
currently in use by another Gradle instance.
   # [2021-10-19T17:25:07.234Z]   Owner PID: 3999
   # [2021-10-19T17:25:07.234Z]   Our PID: 3973
   # [2021-10-19T17:25:07.234Z]   Owner Operation: 
   # [2021-10-19T17:25:07.234Z]   Our operation: 
   # [2021-10-19T17:25:07.234Z]   Lock file: 
/home/jenkins/.gradle/caches/7.0.2/zinc-1.3.5_2.13.6_8/zinc-1.3.5_2.13.6_8.lock
   
   ```
   
   Notice that we are using old version of zinc incremental plugin and may be 
it will be a good idea to consider upgrading it toe a recent version 
   
   https://github.com/sbt/zinc/releases
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14055: KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898)

2023-07-19 Thread via GitHub


gharris1727 commented on PR #14055:
URL: https://github.com/apache/kafka/pull/14055#issuecomment-1642919694

   Here's an example error log output when in HYBRID_FAIL mode (I shortened the 
listing for brevity; the actual prints include every non-migrated plugin)
   ```
   Plugins are missing ServiceLoader manifests, these plugins will not be 
visible with plugin.discovery=SERVICE_LOAD: [
   classpathorg.apache.kafka.connect.converters.ByteArrayConverter  
undefined
   classpathorg.apache.kafka.connect.converters.DoubleConverter 
undefined
   classpathorg.apache.kafka.connect.converters.FloatConverter  
undefined
   classpathorg.apache.kafka.connect.converters.IntegerConverter
undefined
   classpathorg.apache.kafka.connect.converters.LongConverter   
undefined
   classpathorg.apache.kafka.connect.converters.ShortConverter  
undefined
   classpath
org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingConnector
0.0.0
   classpath
org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector
0.0.0
   ...
   classpathorg.apache.kafka.connect.transforms.predicates.TopicNameMatches 
undefined
   ]
at 
app//org.apache.kafka.connect.runtime.isolation.Plugins.maybeReportHybridDiscoveryIssue(Plugins.java:131)
at 
app//org.apache.kafka.connect.runtime.isolation.Plugins.initLoaders(Plugins.java:92)
at 
app//org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:74)
at 
app//org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:64)
at 
app//org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:121)
at 
app//org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
at 
app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:187)
at 
app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:283)
at 
app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:148)
...
(test suite)
   ```
   
   This is what appears if the manifests included in this PR are not present.
   
   The same, but with HYBRID_WARN:
   ```
   [2023-07-19 17:03:44,143] WARN Plugins are missing ServiceLoader manifests, 
these plugins will not be visible with plugin.discovery=SERVICE_LOAD: [
   classpathorg.apache.kafka.connect.converters.ByteArrayConverter  
undefined
   ...
   classpathorg.apache.kafka.connect.transforms.predicates.TopicNameMatches 
undefined
   ] (org.apache.kafka.connect.runtime.isolation.Plugins:128)
   ```
   
   I did notice that if logging is OFF or ERROR for the runtime (such as in the 
mirror tests) then the warning doesn't print if the caller overrides the 
configuration. The failure does propagate, so they will certainly see that in 
HYBRID_FAIL.
   
   I'm going to push the required manifests, so just delete/revert them if 
you'd like to see these errors on the classpath.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #14055: KAFKA-15031: Add configurable scanning modes to Connect worker config (KIP-898)

2023-07-19 Thread via GitHub


gharris1727 opened a new pull request, #14055:
URL: https://github.com/apache/kafka/pull/14055

   This is the primary feature for KIP-898, which allows users to reconfigure 
the Connect worker among the different scanning modes. The different modes and 
their functionality and intended use are as described in 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.

2023-07-19 Thread via GitHub


CalvinConfluent commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1268783217


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1366,6 +1376,17 @@ class Partition(val topicPartition: TopicPartition,
   fetchParams.replicaId,
   fetchPartitionData
 )
+
+// Fence the fetch request with stale broker epoch from a rebooted 
follower.
+if (metadataCache.isInstanceOf[KRaftMetadataCache]) {
+  val brokerEpoch = fetchParams.replicaEpoch
+  val currentBrokerEpoch = 
replica.stateSnapshot.brokerEpoch.getOrElse(-1L)
+  if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) {
+throw new StaleBrokerEpochException(s"Received fetch request for 
$topicPartition with stale broker " +
+  s"epoch=$brokerEpoch. The expected broker epoch= 
$currentBrokerEpoch.")
+  }
+}

Review Comment:
   Make sense. Then just abort the fetch state update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #14054: KAFKA-14648: Moving bootstrap to NetworkClient Poll.

2023-07-19 Thread via GitHub


philipnee opened a new pull request, #14054:
URL: https://github.com/apache/kafka/pull/14054

   Motivation:
   Instantiating a new client may result in a fatal failure if the bootstrap 
server cannot be resolved due to misconfiguration or transient network issues 
such as slow DNS. This is suboptimal because of the fact that it might take a 
long time for the address to become available at the DNS server, and users will 
need to continue to retry.  Also, the ConfigException exception type does not 
accurately reflect the root cause of the problem, which makes it hard to handle 
this failure case.  We think it is reasonable to allow users to have a grace 
period to retry if the address cannot be resolved immediately. Also, poisoning 
the clients during the construction can be obstructive; I think it is better to 
fail the client on its first attempt to connect to the network.
   
   Changes:
   Changing the `NetworkClient` constructor to accept a set of bootstrap 
configurations, which is then used for bootstrapping during the 
`NetworkClient.poll()` operation.  To avoid passing more parameters to the 
existing list, I wrapped the configurations into a `BootstrapConfiguration` 
class.
   
   On the client side, bootstrapping are removed as the motivation of the KIP 
is to avoid bootstrapping during the instance construction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-19 Thread via GitHub


gharris1727 merged PR #13971:
URL: https://github.com/apache/kafka/pull/13971


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-19 Thread via GitHub


gharris1727 commented on PR #13971:
URL: https://github.com/apache/kafka/pull/13971#issuecomment-1642887109

   Flaky test failures appear unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15221) Potential race condition between requests from rebooted followers

2023-07-19 Thread Calvin Liu (Jira)
Calvin Liu created KAFKA-15221:
--

 Summary: Potential race condition between requests from rebooted 
followers
 Key: KAFKA-15221
 URL: https://issues.apache.org/jira/browse/KAFKA-15221
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.5.0
Reporter: Calvin Liu
Assignee: Calvin Liu
 Fix For: 3.6.0, 3.5.1


When the leader processes the fetch request, it does not acquire locks when 
updating the replica fetch state. Then there can be a race between the fetch 
requests from a rebooted follower.

T0, broker 1 sends a fetch to broker 0(leader). At the moment, broker 1 is not 
in ISR.

T1, broker 1 crashes.

T2 broker 1 is back online and receives a new broker epoch. Also, it sends a 
new Fetch request.

T3 broker 0 receives the old fetch requests and decides to expand the ISR.

T4 Right before broker 0 starts to fill the AlterPartitoin request, the new 
fetch request comes in and overwrites the fetch state. Then broker 0 uses the 
new broker epoch on the AlterPartition request.

In this way, the AlterPartition request can get around KIP-903 and wrongly 
update the ISR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-07-19 Thread via GitHub


kirktrue commented on PR #13990:
URL: https://github.com/apache/kafka/pull/13990#issuecomment-1642861657

   @junrao The tests ran again with (no code change in between) and now there 
are only four test failures in (seemingly) unrelated areas of the code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13949: KAFKA-15141: init logger statically on hot codepaths

2023-07-19 Thread via GitHub


ijuma commented on PR #13949:
URL: https://github.com/apache/kafka/pull/13949#issuecomment-1642838445

   I merged the PR. Thanks for the contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-19 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1268717074


##
metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java:
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * These are the metrics which are managed by the SnapshotEmitter class.
+ */
+public final class SnapshotEmitterMetrics implements AutoCloseable {
+private final static MetricName LATEST_SNAPSHOT_GENERATED_BYTES = 
getMetricName(
+"SnapshotEmitter", "LatestSnapshotGeneratedBytes");
+private final static MetricName LATEST_SNAPSHOT_GENERATED_AGE_MS = 
getMetricName(
+"SnapshotEmitter", "LatestSnapshotGeneratedAgeMs");
+
+private final Optional registry;
+private final Time time;
+private final AtomicLong latestSnapshotGeneratedBytes;
+private final AtomicLong latestSnapshotGeneratedTimeMs;
+
+/**
+ * Create a new LoaderMetrics object.
+ *
+ * @param registry  The metrics registry, or Optional.empty if this is a 
test and we don't have one.
+ */
+public SnapshotEmitterMetrics(
+Optional registry,
+Time time,
+long initialLatestSnapshotGeneratedBytes
+) {
+this.registry = registry;
+this.time = time;
+this.latestSnapshotGeneratedBytes = new 
AtomicLong(initialLatestSnapshotGeneratedBytes);
+this.latestSnapshotGeneratedTimeMs = new AtomicLong(monoTimeInMs());
+registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_BYTES, 
new Gauge() {
+@Override
+public Long value() {
+return latestSnapshotGeneratedBytes();
+}
+}));
+registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_AGE_MS, 
new Gauge() {
+@Override
+public Long value() {
+return latestSnapshotGeneratedAgeMs();
+}
+}));
+}
+
+long monoTimeInMs() {
+return TimeUnit.NANOSECONDS.toMillis(time.nanoseconds());
+}
+
+public void setLatestSnapshotGeneratedBytes(long value) {
+this.latestSnapshotGeneratedBytes.set(value);
+}

Review Comment:
   Fixed, and this is now tested.



##
metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java:
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * These are the metrics which are managed by the SnapshotEmitter class.
+ */
+public final class SnapshotEmitterMetrics implements AutoCloseable {


[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-19 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1268716163


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -54,6 +54,14 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
 "KafkaController", "LastAppliedRecordTimestamp");
 private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName(
 "KafkaController", "LastAppliedRecordLagMs");

Review Comment:
   I'd prefer it this way since we're initializing a bunch of constants here, 
and sometimes we want to grep for stuff.
   
   It's kind of annoying when you have "matryoshka doll initialization" of 
constants.
   
   Like rather than having:
   ```
   static final String FOO = "foo";
   static final String BAR = "bar";
   static final String FOO_BAR = FOO + BAR;
   static final String FOO_BAR_BAZ = FOO_BAR + "baz";
   ```
   
   I'd rather just have:
   ```
   static final String FOO = "foo";
   static final String BAR = "bar";
   static final String FOO_BAR = "foobar";
   static final String FOO_BAR_BAZ = "foobarbaz";
   ```
   
   since then I can grep for the constant and it works as expected.
   
   Although maybe other people disagree?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on a diff in pull request #14053: Fence follower fetch with stale broker epoch

2023-07-19 Thread via GitHub


splett2 commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1268702825


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1366,6 +1376,17 @@ class Partition(val topicPartition: TopicPartition,
   fetchParams.replicaId,
   fetchPartitionData
 )
+
+// Fence the fetch request with stale broker epoch from a rebooted 
follower.
+if (metadataCache.isInstanceOf[KRaftMetadataCache]) {
+  val brokerEpoch = fetchParams.replicaEpoch
+  val currentBrokerEpoch = 
replica.stateSnapshot.brokerEpoch.getOrElse(-1L)
+  if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) {
+throw new StaleBrokerEpochException(s"Received fetch request for 
$topicPartition with stale broker " +
+  s"epoch=$brokerEpoch. The expected broker epoch= 
$currentBrokerEpoch.")
+  }
+}

Review Comment:
   We would also need to update the `Replica.updateFetchState` call to check 
the brokerEpoch before trying to apply a fetch update. Otherwise we can pass 
this check and race for an update.
   
   I don't think we need to check whether we are in KRaft mode or not. The 
broker epoch should be monotonic in both zookeeper and kraft mode.
   
   I also don't think we can throw `StaleBrokerEpochException` since this would 
introduce a new error code returned on fetch responses without a KIP. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent opened a new pull request, #14053: Fence follower fetch with stale broker epoch

2023-07-19 Thread via GitHub


CalvinConfluent opened a new pull request, #14053:
URL: https://github.com/apache/kafka/pull/14053

   There are two changes here:
   1. Acquire read lock when updating the follower fetch state. This can 
prevent a race case with fetch requests from a rebooted follower.
   2. Fence the follower fetch request with a stale broker epoch in Kraft mode. 
The broker epoch increases monotonically in Kraft mode. So if a fetch request 
with a smaller broker epoch, it should be stale.
   
   UT updated
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy

2023-07-19 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-10579:

Fix Version/s: 3.6.0

> Flaky test 
> connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
> 
>
> Key: KAFKA-10579
> URL: https://issues.apache.org/jira/browse/KAFKA-10579
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: A. Sophie Blee-Goldman
>Assignee: Greg Harris
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.6.0
>
>
>  
> {{java.lang.NullPointerException
>   at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
>   at org.reflections.Store.getAllIncluding(Store.java:82)
>   at org.reflections.Store.getAll(Store.java:93)
>   at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
>   at 
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
>   at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
>   at 
> org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167)
>   at 
> org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}}
> {{}}
> https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12842) Failing test: org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic

2023-07-19 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-12842.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

> Failing test: 
> org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic
> --
>
> Key: KAFKA-12842
> URL: https://issues.apache.org/jira/browse/KAFKA-12842
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: John Roesler
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.6.0
>
>
> This test failed during a PR build, which means that it failed twice in a 
> row, due to the test-retry logic in PR builds.
>  
> [https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643417209]
>  
> {noformat}
> java.lang.NullPointerException
>   at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
>   at org.reflections.Store.getAllIncluding(Store.java:82)
>   at org.reflections.Store.getAll(Store.java:93)
>   at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:352)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
>   at 
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
>   at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93)
>   at 
> org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:174)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:260)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:141)
>   at 
> org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic(ConnectWorkerIntegrationTest.java:303)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> 

[jira] [Resolved] (KAFKA-8690) Flakey test ConnectWorkerIntegrationTest#testAddAndRemoveWorke

2023-07-19 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-8690.

Fix Version/s: 3.6.0
   Resolution: Fixed

> Flakey test  ConnectWorkerIntegrationTest#testAddAndRemoveWorke
> ---
>
> Key: KAFKA-8690
> URL: https://issues.apache.org/jira/browse/KAFKA-8690
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.6.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23570/consoleFull]
> org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest > 
> testAddAndRemoveWorker STARTED*02:56:46* 
> org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker.test.stdout*02:56:46*
>  *02:56:46* org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest 
> > testAddAndRemoveWorker FAILED*02:56:46* java.lang.AssertionError: 
> Condition not met within timeout 15000. Connector tasks did not start in 
> time.*02:56:46* at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*02:56:46*
>  at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*02:56:46*
>  at 
> org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker(ConnectWorkerIntegrationTest.java:118)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy

2023-07-19 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-10579.
-
Resolution: Fixed

> Flaky test 
> connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
> 
>
> Key: KAFKA-10579
> URL: https://issues.apache.org/jira/browse/KAFKA-10579
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: A. Sophie Blee-Goldman
>Assignee: Greg Harris
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.6.0
>
>
>  
> {{java.lang.NullPointerException
>   at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
>   at org.reflections.Store.getAllIncluding(Store.java:82)
>   at org.reflections.Store.getAll(Store.java:93)
>   at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
>   at 
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
>   at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
>   at 
> org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167)
>   at 
> org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}}
> {{}}
> https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 merged pull request #14029: KAFKA-10579: Upgrade reflections from 0.9.12 to 0.10.2

2023-07-19 Thread via GitHub


gharris1727 merged PR #14029:
URL: https://github.com/apache/kafka/pull/14029


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14029: KAFKA-10579: Upgrade reflections from 0.9.12 to 0.10.2

2023-07-19 Thread via GitHub


gharris1727 commented on PR #14029:
URL: https://github.com/apache/kafka/pull/14029#issuecomment-1642744154

   Test failures appear unrelated, and this has passed unit tests, system 
tests, and stress testing already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-19 Thread via GitHub


gharris1727 commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1268647699


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java:
##
@@ -46,4 +51,49 @@ public void startClusters() throws Exception {
 super.startClusters();
 }
 
+@Override
+@Test
+public void testReplication() throws Exception {
+super.testReplication();
+
+// Augment the base replication test case with some extra testing of 
the offset management
+// API introduced in KIP-875
+// We do this only when exactly-once support is enabled in order to 
avoid having to worry about
+// zombie tasks producing duplicate records and/or writing stale 
offsets to the offsets topic
+
+String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
+String backupTopic2 = remoteTopicName("test-topic-2", 
PRIMARY_CLUSTER_ALIAS);
+
+// Explicitly move back to offset 0
+// Note that the connector treats the offset as the last-consumed 
offset,
+// so it will start reading the topic partition from offset 1 when it 
resumes
+alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, 
"test-topic-1");
+// Reset the offsets for test-topic-2
+resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2");
+resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+
+int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + 
((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS);
+assertEquals(expectedRecordsTopic1, 
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, 
backupTopic1).count(),
+"Records were not re-replicated to backup cluster after 
altering offsets.");
+int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2;
+assertEquals(expectedRecordsTopic2, 
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, 
backupTopic2).count(),
+"New topic was not re-replicated to backup cluster after 
altering offsets.");
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+Class[] connectorsToReset = 
CONNECTOR_LIST.toArray(new Class[0]);
+// Resetting the offsets for the heartbeat and checkpoint connectors 
doesn't have any effect
+// on their behavior, but users may want to wipe offsets from them to 
prevent the offsets topic
+// from growing infinitely. So, we include them in the list of 
connectors to reset as a sanity check

Review Comment:
   I think the currently proposed validation is reasonable, and I agree with 
the points raised above.
   I didn't even know that the Checkpoint and Heartbeat connectors emitted 
offsets in the first place, since the operation of the connector never reads 
them back and I never had any use to trace their data flow. I think the GET 
portion of the API will be the first time any user sees these offsets, and the 
PUT / alterOffsets methods will be the first time that they see how they 
(don't) affect the operation of the connectors.
   
   Since it is harmless to have the users change these offsets, the 
alterOffsets calls should guide users to well-formed inputs but not restrict 
them more than that. For the connector, the biggest impact will be keeping the 
offsets store clean in case a future extension wishes to use the offsets in a 
meaningful way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-19 Thread via GitHub


gharris1727 commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1268624984


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -933,12 +938,94 @@ protected static void 
waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne
 }
 }
 
-private static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List> connectorClasses)  {
+protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List> connectorClasses)  {
 for (Class connector : connectorClasses) {
 connectCluster.restartConnectorAndTasks(connector.getSimpleName(), 
false, true, false);
 }
 }
 
+@SafeVarargs
+protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, Class... connectorClasses) throws 
InterruptedException {
+for (Class connectorClass : connectorClasses) {
+connectCluster.resumeConnector(connectorClass.getSimpleName());
+}
+for (Class connectorClass : connectorClasses) {
+String connectorName = connectorClass.getSimpleName();
+
connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+connectorName,
+1,
+"Connector '" + connectorName + "' and/or task did not 
resume in time"
+);
+}
+}
+
+protected static void 
alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, 
LongUnaryOperator alterOffset, String... topics) {

Review Comment:
   This also stops the connectors, which isn't described in the function name. 
Can you split this into two functions, or change the name to stopAndAlter.. etc?



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -933,12 +938,94 @@ protected static void 
waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne
 }
 }
 
-private static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List> connectorClasses)  {
+protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List> connectorClasses)  {
 for (Class connector : connectorClasses) {
 connectCluster.restartConnectorAndTasks(connector.getSimpleName(), 
false, true, false);
 }
 }
 
+@SafeVarargs
+protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, Class... connectorClasses) throws 
InterruptedException {
+for (Class connectorClass : connectorClasses) {
+connectCluster.resumeConnector(connectorClass.getSimpleName());
+}
+for (Class connectorClass : connectorClasses) {
+String connectorName = connectorClass.getSimpleName();
+
connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+connectorName,
+1,
+"Connector '" + connectorName + "' and/or task did not 
resume in time"
+);
+}
+}
+
+protected static void 
alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, 
LongUnaryOperator alterOffset, String... topics) {
+Set topicsSet = new HashSet<>(Arrays.asList(topics));
+
+String connectorName = MirrorSourceConnector.class.getSimpleName();
+connectCluster.stopConnector(connectorName);

Review Comment:
   Should this additionally wait for the connectors to stop? it appears that it 
just waits for the REST API to return a 200, which may complete before the 
tasks have stopped executing and committed offsets.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java:
##
@@ -46,4 +51,49 @@ public void startClusters() throws Exception {
 super.startClusters();
 }
 
+@Override
+@Test
+public void testReplication() throws Exception {
+super.testReplication();
+
+// Augment the base replication test case with some extra testing of 
the offset management
+// API introduced in KIP-875
+// We do this only when exactly-once support is enabled in order to 
avoid having to worry about
+// zombie tasks producing duplicate records and/or writing stale 
offsets to the offsets topic
+
+String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
+String backupTopic2 = remoteTopicName("test-topic-2", 
PRIMARY_CLUSTER_ALIAS);
+
+// Explicitly move back to offset 0
+// Note that the connector treats the offset as the last-consumed 
offset,
+// so it will start reading the topic partition from offset 1 when it 
resumes
+

[GitHub] [kafka] divijvaidya commented on pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1

2023-07-19 Thread via GitHub


divijvaidya commented on PR #14032:
URL: https://github.com/apache/kafka/pull/14032#issuecomment-1642721299

   We have had a large number of failures [1] since this was merged with 
`Timeout waiting to lock * cache (/home/jenkins/.gradle/caches/*/* It is 
currently in use by another Gradle instance.` 
   
   Please investigate if you get a chance, else we may have to revert this 
commit tomorrow until we find the cause and fix.
   
   [1] 
https://ge.apache.org/scans/failures?failures.failureMessage=Execution%20failed%20for%20task%20*%0A%3E%20Timeout%20waiting%20to%20lock%20*%20cache%20(/home/jenkins/.gradle/caches/*/*%20It%20is%20currently%20in%20use%20by%20another%20Gradle%20instance.%0A%20%20Owner%20PID:%20*%0A%20%20Our%20PID:%20*%0A%20%20Owner%20Operation:%20%0A%20%20Our%20operation:%20%0A%20%20Lock%20file:%20/home/jenkins/.gradle/caches/*/*/*=kafka=Europe/Berlin
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-19 Thread via GitHub


jolshan commented on PR #14046:
URL: https://github.com/apache/kafka/pull/14046#issuecomment-1642696317

   Thanks for the fix @dajac let's just confirm the build and we should be good 
:) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #14052: KAFKA-14960: [Part I]TopicMetadataRequestManager Implementation (#7)

2023-07-19 Thread via GitHub


philipnee opened a new pull request, #14052:
URL: https://github.com/apache/kafka/pull/14052

   TopicMetadataRequestManager implementation
   
   -
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13977: KAFKA-15162: Reflectively find plugins in parent ClassLoaders that aren't on the classpath

2023-07-19 Thread via GitHub


gharris1727 commented on PR #13977:
URL: https://github.com/apache/kafka/pull/13977#issuecomment-1642662589

   Test failures in CI appear unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-19 Thread via GitHub


dajac commented on code in PR #14046:
URL: https://github.com/apache/kafka/pull/14046#discussion_r1268592972


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##
@@ -89,33 +88,29 @@ public Map offsets() {
 return offsets;
 }
 
-public static List getErrorResponseTopics(
-List requestTopics,

Review Comment:
   The usage below was the single usage of this method so we always used it 
with all topics. For the context, I refactored it because I use the refactored 
one in one of the next patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-19 Thread via GitHub


dajac commented on PR #14046:
URL: https://github.com/apache/kafka/pull/14046#issuecomment-1642653815

   > Something strange is going on with AuthorizerIntegrationTest in the build, 
but that might be unrelated. I will look into that.
   
   @jolshan I found the issue related to AuthorizerIntegrationTest failures. I 
just pushed a fix. See last commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #13949: KAFKA-15141: init logger statically on hot codepaths

2023-07-19 Thread via GitHub


ijuma merged PR #13949:
URL: https://github.com/apache/kafka/pull/13949


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-19 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1268520485


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 

Review Comment:
   Probably time to update this now?



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();

Review Comment:
   We shouldn't be using an `AtomicReference` here. The reason it's used in the 
[linked 
snippet](https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L430-L484)
 in the `ConsumerCoordinator` class is to ensure that, in the event that 
multiple errors occur, we don't overwrite the first exception that we saw.
   
   That's not a possibility here since `secondaryStoreTombstoneWriteError` is 
only ever updated in separate `catch` clauses for the same `try` block, which 
means that it's guaranteed to never be updated more than once.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when

Review Comment:
   I like the general idea here right now: block indefinitely for exactly-once, 
block within the offset timeout otherwise. We also note in the 
[docs](https://kafka.apache.org/documentation.html#connectconfigs_offset.flush.timeout.ms)
 for the `offset.flush.timeout.ms` property that it "has no effect for source 
connectors running with exactly-once support".
   
   I don't think we need to worry about placing an upper bound on the time we 
take with exactly-once support enabled. If we did, it would make tasks more 
brittle (remember, we fail tasks when offset commits fail in this mode), and 
preemptively writing tombstone records to the secondary offsets topic shouldn't 
corrupt the offsets that a connector sees even if the current transaction 
(including a write to the connector-specific offsets topic) fails. We may end 
up writing garbage to the secondary offsets topic, but guarantees for 
exactly-once support are lost as soon as a connector switches over to reading 
exclusively from that topic, and tombstones in the secondary topic don't 
overwrite non-tombstone offsets for the same partition in the primary topic.
   
   That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a `Future` to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).



##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test

Review Comment:
   +1 for moving these tests to a `ConnectorOffsetBackingStoreTest`; the 
changes to the main code are entirely contained within the 

[GitHub] [kafka] jolshan commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-19 Thread via GitHub


jolshan commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1268535712


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+/**
+ * Represents a committed offset with its metadata.
+ */
+public class OffsetAndMetadata {
+public static final String NO_METADATA = "";

Review Comment:
   What cases do we use this constant?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-19 Thread via GitHub


jolshan commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1268526325


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -386,6 +386,18 @@ public short groupMetadataValueVersion() {
 }
 }
 
+public short offsetCommitValueVersion() {
+if (isLessThan(MetadataVersion.IBP_2_1_IV0)) {
+return 1;
+} else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) {
+return 2;
+} else {
+// Serialize with the highest supported non-flexible version
+// until a tagged field is introduced or the version is bumped.
+return  3;
+}

Review Comment:
   Also do we expect to have lower IBPs if this is used for the new group 
coordinator? I didn't think we could go that low for kraft.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-19 Thread via GitHub


jolshan commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1268525603


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -386,6 +386,18 @@ public short groupMetadataValueVersion() {
 }
 }
 
+public short offsetCommitValueVersion() {
+if (isLessThan(MetadataVersion.IBP_2_1_IV0)) {
+return 1;
+} else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) {
+return 2;
+} else {
+// Serialize with the highest supported non-flexible version
+// until a tagged field is introduced or the version is bumped.
+return  3;
+}

Review Comment:
   >  // Serialize with the highest supported non-flexible version
   // until a tagged field is introduced or the version is bumped.
   
   This comment confused me a bit. Do we plan to manually update this method 
when new versions come in? Why is there a callout for flexible versions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-19 Thread via GitHub


yashmayya commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1268520364


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java:
##
@@ -46,4 +51,49 @@ public void startClusters() throws Exception {
 super.startClusters();
 }
 
+@Override
+@Test
+public void testReplication() throws Exception {
+super.testReplication();
+
+// Augment the base replication test case with some extra testing of 
the offset management
+// API introduced in KIP-875
+// We do this only when exactly-once support is enabled in order to 
avoid having to worry about
+// zombie tasks producing duplicate records and/or writing stale 
offsets to the offsets topic
+
+String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
+String backupTopic2 = remoteTopicName("test-topic-2", 
PRIMARY_CLUSTER_ALIAS);
+
+// Explicitly move back to offset 0
+// Note that the connector treats the offset as the last-consumed 
offset,
+// so it will start reading the topic partition from offset 1 when it 
resumes
+alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, 
"test-topic-1");
+// Reset the offsets for test-topic-2
+resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2");
+resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+
+int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + 
((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS);
+assertEquals(expectedRecordsTopic1, 
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, 
backupTopic1).count(),
+"Records were not re-replicated to backup cluster after 
altering offsets.");
+int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2;
+assertEquals(expectedRecordsTopic2, 
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, 
backupTopic2).count(),
+"New topic was not re-replicated to backup cluster after 
altering offsets.");
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+Class[] connectorsToReset = 
CONNECTOR_LIST.toArray(new Class[0]);
+// Resetting the offsets for the heartbeat and checkpoint connectors 
doesn't have any effect
+// on their behavior, but users may want to wipe offsets from them to 
prevent the offsets topic
+// from growing infinitely. So, we include them in the list of 
connectors to reset as a sanity check

Review Comment:
   > I've pushed a change that allows arbitrary source partitions to be used 
with null source offsets; LMKWYT.
   
   Thanks Chris, looks good. We can probably update the 
`FileStreamSourceConnector::alterOffsets` method with a similar change in a 
follow up as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #14044: KAFKA-15216: InternalSinkRecord::newRecord should not ignore new headers

2023-07-19 Thread via GitHub


yashmayya commented on PR #14044:
URL: https://github.com/apache/kafka/pull/14044#issuecomment-1642566534

   > The change from private to protected technically counts as a change to 
public interface, so we'd need a KIP for that
   
   Ah, I did wonder about this but wasn't entirely certain, thanks for the 
clarification!
   
   > I'm also a little hesitant to upgrade the visibility of these members 
regardless since that would limit the compatibility of plugins that rely on 
them (most likely by subclassing ConnectRecord, SinkRecord, etc.), since that 
would render them [binary 
incompatible](https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html#jls-13.4.7)
 with older versions of Connect where the fields were still private.
   
   I considered the binary compatibility impact of this change directly on 
plugins themselves (and there shouldn't be any), but good point on the backward 
compatibility restriction that would be imposed on any potential external 
subclasses of `ConnectRecord` due to this change.
   
   > Can we reduce the scope here to use fields instead of methods wherever 
possible, but without altering the visibility of any parts of our public API?
   
   Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-19 Thread via GitHub


jolshan commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1268514084


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+/**
+ * Represents a committed offset with its metadata.
+ */
+public class OffsetAndMetadata {
+public static final String NO_METADATA = "";
+
+/**
+ * The committed offset.
+ */
+public final long offset;
+
+/**
+ * The leader epoch in use when the offset was committed.
+ */
+public final OptionalInt leaderEpoch;

Review Comment:
   For my understanding, this is optional because versions < 6 won't have 
leader epochs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #14036: MINOR: Fix Javadocs for SourceTaskContext::transactionContext and SinkTaskContext::errantRecordReporter to use NoSuchMethodError instead of

2023-07-19 Thread via GitHub


yashmayya commented on PR #14036:
URL: https://github.com/apache/kafka/pull/14036#issuecomment-1642557203

   Thanks Greg!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-07-19 Thread via GitHub


philipnee commented on PR #13797:
URL: https://github.com/apache/kafka/pull/13797#issuecomment-1642545962

   @junrao - thanks for the review.  I've addressed the two issues you pointed 
out. Would you be able to take another look at it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 opened a new pull request, #14051: KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently

2023-07-19 Thread via GitHub


vamossagar12 opened a new pull request, #14051:
URL: https://github.com/apache/kafka/pull/14051

   When deleting topics, we'll first clear all the remoteReplicaMap when 
stopPartitions 
[here](https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/server/ReplicaManager.scala#L554).
 But this time, 
t[here](https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/cluster/Partition.scala#L1001)
 might be fetch request coming from follower, and try to check if the replica 
is eligible to be added into ISR here. At this moment, NPE will be thrown. 
Although it's fine since this topic is already deleted, it'd be better to avoid 
it happen.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


kamalcph commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1268420993


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   > But are the any drawbacks by removing highestLogOffset <= 
leaderEpochEndOffset from the while? What kind of problem we can face and why 
having this check is more safe rather than remove it?
   
   Assume there are multiple back-to-back unclean leader elections happened and 
only **one replica** is available at any point of time. Both B1 and B2 may not 
be aware of all the leader-epochs. If the consumer reads the data from the 
beginning of the topic, then we should be able to serve the respective remote 
log segments for the (epoch, start_offset) present in both B1 and B2.
   
   This patch only mark the segment as unreferenced if the current segment is a 
superset of all the previously uploaded segments for the same epoch which means 
the message is same across the segments.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


kamalcph commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1268399339


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   @satishd @Nickstery Correct me If I'm wrong:
   
   The logic to find the `copiedOffset`  from remoteStorage doesn't take in 
account of the current-leader-epoch checkpoint file. In the above test, when B2 
becomes leader, it's leader-epoch-checkpoint file will look like:
   
   (The case mentioned in the test can happen when acks is set to 1)
   
   ```
   0
   2
   0 0
   1 151
   ```
   
   The logic to find the copied offset traverses the checkpoint file from 
latest-epoch. So, when B2 tries to find the copied offset:
   
   For epoch(1), there won't any uploaded segments, so it returns empty.
   For epoch(0), the highest copied offset will be 200
   
   So, B2 will skip the segment S2 (101-190) which means there is a data loss 
from [151-190]
   
   @Nickstery This can be fixed if we update the  logic to find the 
`copiedOffset`:
   ```
   find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, 
highest-remote-offset-for-epoch)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744726#comment-17744726
 ] 

Matthias J. Sax commented on KAFKA-15190:
-

{quote}but although {{StreamsPartitionAssignor}} sometimes calls it a client ID 
and sometimes a process ID it's a {{UUID}} so I assume it really is the process 
ID.
{quote}
Thanks for calling this out. You are right; I missed this point.

As you did mention "max recovery lag", I assume you have a stateful app that 
uses in-memory stores only?

Another thing coming to my mind: the `client.id` has actually different purpose 
and should not be unique per `KafkaStreams` instance, but should be the _same_ 
for all instances (the name is a little bit mis-leading). For example, if you 
configure quotas, it's based on `client.id` and you usually want quotas to be 
set per application, not per instance.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #14000: [MINOR] Fixing comment with IncrementalCooperativeAssignor#handleLostAssignments

2023-07-19 Thread via GitHub


vamossagar12 commented on PR #14000:
URL: https://github.com/apache/kafka/pull/14000#issuecomment-1642481356

   Thanks @gharris1727 . hmm the meaning of the variable and it's usage in the 
comment is slightly off in this case. `revokedInPrevious` being true doesn't 
just signify  successive revoking rebalances (which is what the original 
comment reflects) but also signifies that *just* the previous round had one. I 
just realised that my updated comment isn't accurate as well and ideally both 
should be included. Something along the lines of 
   
   ```
   There are no lost assignments and there have been no revoking rebalances in 
the previous round(s)
   ```
   
   would be more accurate imo. WDYT?
   
   Also, there is a bug where in due to empty lost assignments and a follow up 
rebalance post connector deletions can lead to these lines getting printed:
   
   
https://github.com/apache/kafka/pull/14000/files#diff-e24067b121eb960feebfa099bd9c30382e330eaf5db39302a9d7a50e29b3acb4L459-R462
   
   Ideally nothing should happen if lost assignments are empty. I haven't had 
the chance to take a look at fixing it though. Will file a ticket later when I 
have some time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


kamalcph commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1268399339


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   @satishd 
   The logic to find the `copiedOffset`  from remoteStorage doesn't take in 
account of the current-leader-epoch checkpoint file. In the above test, when B2 
becomes leader, it's leader-epoch-checkpoint file will look like:
   
   (The case mentioned in the test can happen when acks is set to 1)
   
   ```
   0
   2
   0 0
   1 151
   ```
   
   The logic to find the copied offset traverses the checkpoint file from 
latest-epoch. So, when B2 tries to find the copied offset:
   
   For epoch(1), there won't any uploaded segments, so it returns empty.
   For epoch(0), the highest copied offset will be 200
   
   So, B2 will skip the segment S2 (101-190) which means there is a data loss 
from [151-190]
   
   @Nickstery This can be fixed if we update the  logic to find the 
`copiedOffset`:
   ```
   find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, 
highest-remote-offset-for-epoch)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #14036: MINOR: Fix Javadocs for SourceTaskContext::transactionContext and SinkTaskContext::errantRecordReporter to use NoSuchMethodError instead of NoS

2023-07-19 Thread via GitHub


gharris1727 merged PR #14036:
URL: https://github.com/apache/kafka/pull/14036


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


satishd commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1268373435


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   Why would segment data for [151L,190L] is lost? This will be part of the 
next leader epoch state(viz 1) which should contain this segment. @kamalcph 
@Nickstery  Am I missing anything here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-19 Thread via GitHub


jolshan commented on PR #14046:
URL: https://github.com/apache/kafka/pull/14046#issuecomment-1642437560

   Something strange is going on with AuthorizerIntegrationTest in the build, 
but that might be unrelated. I will look into that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 opened a new pull request, #14050: KAFKA-15220: Do not returned fenced brokers from getAliveBrokerNode

2023-07-19 Thread via GitHub


splett2 opened a new pull request, #14050:
URL: https://github.com/apache/kafka/pull/14050

   `getAliveBrokerNode` returns fenced brokers as alive which is inconsistent 
with methods like getAliveBrokerNodes.
   Add a filter to not return fenced brokers and adds a test to validate that 
`getAliveBrokerNode` is consistent with `getAliveBrokerNodes`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14000: [MINOR] Fixing comment with IncrementalCooperativeAssignor#handleLostAssignments

2023-07-19 Thread via GitHub


gharris1727 commented on PR #14000:
URL: https://github.com/apache/kafka/pull/14000#issuecomment-1642429559

   Hey @vamossagar12 I don't think that this comment is incorrect or confusing 
enough to warrant stand-alone PR. If you have other substantive changes in this 
area, then we can re-examine this comment at that time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15220) KRaftMetadataCache returns fenced brokers from getAliveBrokerNode

2023-07-19 Thread David Mao (Jira)
David Mao created KAFKA-15220:
-

 Summary: KRaftMetadataCache returns fenced brokers from 
getAliveBrokerNode
 Key: KAFKA-15220
 URL: https://issues.apache.org/jira/browse/KAFKA-15220
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao
Assignee: David Mao






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-19 Thread via GitHub


jolshan commented on code in PR #14046:
URL: https://github.com/apache/kafka/pull/14046#discussion_r1268350437


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##
@@ -89,33 +88,29 @@ public Map offsets() {
 return offsets;
 }
 
-public static List getErrorResponseTopics(
-List requestTopics,

Review Comment:
   Did we always just pass in all the topics here? I see the usage below that 
was removed, but curious if we would ever want to make a response for just some 
topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-19 Thread via GitHub


jolshan commented on PR #14046:
URL: https://github.com/apache/kafka/pull/14046#issuecomment-1642424251

   > Note the use of "latestVersionUnstable": true in the request schema. This 
means that this new version is not available yet unless activated.
   
   This also leaves us room to add the topic ID to this version if we so choose?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-19 Thread via GitHub


dajac commented on PR #14046:
URL: https://github.com/apache/kafka/pull/14046#issuecomment-1642425069

   > > Note the use of "latestVersionUnstable": true in the request schema. 
This means that this new version is not available yet unless activated.
   > 
   > This also leaves us room to add the topic ID to this version if we so 
choose?
   
   Yeah, that's right. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14020: KAFKA-10579: Make Reflections thread safe to resolve flaky NPE scanning failure

2023-07-19 Thread via GitHub


gharris1727 commented on PR #14020:
URL: https://github.com/apache/kafka/pull/14020#issuecomment-1642410512

   Closing this in favor of upgrading the library.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 closed pull request #14020: KAFKA-10579: Make Reflections thread safe to resolve flaky NPE scanning failure

2023-07-19 Thread via GitHub


gharris1727 closed pull request #14020: KAFKA-10579: Make Reflections thread 
safe to resolve flaky NPE scanning failure
URL: https://github.com/apache/kafka/pull/14020


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15219) Support delegation tokens in KRaft

2023-07-19 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744700#comment-17744700
 ] 

Proven Provenzano commented on KAFKA-15219:
---

I'm sorry, I seem to have forgotten to create the Jira. I am almost done with 
this work. 
My WIP PR is 
https://github.com/apache/kafka/pull/13916/files#diff-70391f7b23b5528f11808d38481254c5e697e531e1d962f6f03bf759a2cca5fc


> Support delegation tokens in KRaft
> --
>
> Key: KAFKA-15219
> URL: https://issues.apache.org/jira/browse/KAFKA-15219
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Critical
>
> Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft 
> enabled the way to supporting them in KIP-900 by adding SCRAM support but 
> delegation tokens still don't support KRaft.
> There are multiple issues:
> - TokenManager still would try to create tokens in Zookeeper. Instead of this 
> we should forward admin requests to the controller that would store them in 
> the metadata similarly to SCRAM. We probably won't need new protocols just 
> enveloping similarly to other existing controller requests.
> - TokenManager should run on Controller nodes only (or in mixed mode).
> - Integration tests will need to be adapted as well and parameterize them 
> with Zookeeper/KRaft.
> - Documentation needs to be improved to factor in KRaft.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15211) DistributedConfigTest#shouldFailWithInvalidKeySize fails when run after TestSslUtils#generate

2023-07-19 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-15211.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

> DistributedConfigTest#shouldFailWithInvalidKeySize fails when run after 
> TestSslUtils#generate
> -
>
> Key: KAFKA-15211
> URL: https://issues.apache.org/jira/browse/KAFKA-15211
> Project: Kafka
>  Issue Type: Test
>  Components: clients, KafkaConnect
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.6.0
>
>
> The DistributedConfigTest#shouldFailWithInvalidKeySize attempts to configure 
> a hashing algorithm with a key size of 0. When run alone, this test passes, 
> as the default Java hashing algorithm used rejects the key size.
> However, when TestSslUtils#generate runs first, such as via the 
> RestForwardingIntegrationTest, the BouncyCastleProvider is loaded, which 
> provides an alternative hashing algorithm. This implementation does _not_ 
> reject the key size, causing the test to fail.
> We should ether prevent TestSslUtils#generate from leaving the 
> BouncyCastleProvider loaded after use, or adjust the test to pass when the 
> BouncyCastleProvider is loaded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nizhikov commented on pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-19 Thread via GitHub


nizhikov commented on PR #13278:
URL: https://github.com/apache/kafka/pull/13278#issuecomment-1642362522

   @mimaison all your comments addressed except the one with the Tuple. Please, 
review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #14039: KAFKA-15211: Mock InvalidParameterException in DistributedConfigTest

2023-07-19 Thread via GitHub


gharris1727 merged PR #14039:
URL: https://github.com/apache/kafka/pull/14039


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-19 Thread via GitHub


nizhikov commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1268293020


##
tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java:
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * A command for delete records of the given partitions down to the specified 
offset.
+ */
+public class DeleteRecordsCommand {
+private static final int EARLIEST_VERSION = 1;
+
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+private static final DecodeJson.DecodeLong LONG = new 
DecodeJson.DecodeLong();
+
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static void main(String[] args) throws Exception {
+execute(args, System.out);
+}
+
+static Collection> 
parseOffsetJsonStringWithoutDedup(String jsonData) throws 
JsonProcessingException {
+JsonValue js = Json.parseFull(jsonData)
+.orElseThrow(() -> new AdminOperationException("The input string 
is not a valid JSON"));
+
+Optional version = js.asJsonObject().get("version");
+
+return parseJsonData(version.isPresent() ? version.get().to(INT) : 
EARLIEST_VERSION, js);
+}
+
+private static Collection> parseJsonData(int 
version, JsonValue js) throws JsonMappingException {
+if (version == 1) {
+JsonValue partitions = js.asJsonObject().get("partitions")
+.orElseThrow(() -> new AdminOperationException("Missing 
partitions field"));
+
+Collection> res = new ArrayList<>();
+
+Iterator iterator = partitions.asJsonArray().iterator();
+
+while (iterator.hasNext()) {
+JsonObject partitionJs = iterator.next().asJsonObject();
+
+String topic = partitionJs.apply("topic").to(STRING);
+int partition = partitionJs.apply("partition").to(INT);
+long offset = partitionJs.apply("offset").to(LONG);
+
+res.add(new Tuple<>(new TopicPartition(topic, partition), 
offset));
+}
+
+return res;
+}
+
+throw new AdminOperationException("Not supported version field value " 
+ version);
+}
+
+public static void execute(String[] args, PrintStream out) throws 
IOException {
+DeleteRecordsCommandOptions opts = new 
DeleteRecordsCommandOptions(args);
+
+try (Admin adminClient = createAdminClient(opts)) {
+execute(adminClient, 
Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out);
+}
+}
+
+static void execute(Admin adminClient, String offsetJsonString, 
PrintStream out) throws JsonProcessingException {
+Collection> offsetSeq = 
parseOffsetJsonStringWithoutDedup(offsetJsonString);
+
+Set 

[GitHub] [kafka] gharris1727 commented on pull request #14039: KAFKA-15211: Mock InvalidParameterException in DistributedConfigTest

2023-07-19 Thread via GitHub


gharris1727 commented on PR #14039:
URL: https://github.com/apache/kafka/pull/14039#issuecomment-1642357913

   Test failures look unrelated, and tests pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-19 Thread via GitHub


nizhikov commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1268286710


##
tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java:
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * A command for delete records of the given partitions down to the specified 
offset.
+ */
+public class DeleteRecordsCommand {
+private static final int EARLIEST_VERSION = 1;
+
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+private static final DecodeJson.DecodeLong LONG = new 
DecodeJson.DecodeLong();
+
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static void main(String[] args) throws Exception {
+execute(args, System.out);
+}
+
+static Collection> 
parseOffsetJsonStringWithoutDedup(String jsonData) throws 
JsonProcessingException {
+JsonValue js = Json.parseFull(jsonData)
+.orElseThrow(() -> new AdminOperationException("The input string 
is not a valid JSON"));
+
+Optional version = js.asJsonObject().get("version");
+
+return parseJsonData(version.isPresent() ? version.get().to(INT) : 
EARLIEST_VERSION, js);
+}
+
+private static Collection> parseJsonData(int 
version, JsonValue js) throws JsonMappingException {
+if (version == 1) {
+JsonValue partitions = js.asJsonObject().get("partitions")
+.orElseThrow(() -> new AdminOperationException("Missing 
partitions field"));
+
+Collection> res = new ArrayList<>();
+
+Iterator iterator = partitions.asJsonArray().iterator();
+
+while (iterator.hasNext()) {
+JsonObject partitionJs = iterator.next().asJsonObject();
+
+String topic = partitionJs.apply("topic").to(STRING);
+int partition = partitionJs.apply("partition").to(INT);
+long offset = partitionJs.apply("offset").to(LONG);
+
+res.add(new Tuple<>(new TopicPartition(topic, partition), 
offset));
+}
+
+return res;
+}
+
+throw new AdminOperationException("Not supported version field value " 
+ version);
+}
+
+public static void execute(String[] args, PrintStream out) throws 
IOException {
+DeleteRecordsCommandOptions opts = new 
DeleteRecordsCommandOptions(args);
+
+try (Admin adminClient = createAdminClient(opts)) {
+execute(adminClient, 
Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out);
+}
+}
+
+static void execute(Admin adminClient, String offsetJsonString, 
PrintStream out) throws JsonProcessingException {
+Collection> offsetSeq = 
parseOffsetJsonStringWithoutDedup(offsetJsonString);
+
+Set 

[jira] [Updated] (KAFKA-14094) KIP-853: KRaft controller memebership changes

2023-07-19 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14094:
---
Summary: KIP-853: KRaft controller memebership changes  (was: KIP-853: 
KRaft Voters Change)

> KIP-853: KRaft controller memebership changes
> -
>
> Key: KAFKA-14094
> URL: https://issues.apache.org/jira/browse/KAFKA-14094
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14094) KIP-853: KRaft Voters Change

2023-07-19 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14094:
---
Fix Version/s: (was: 3.6.0)

> KIP-853: KRaft Voters Change
> 
>
> Key: KAFKA-14094
> URL: https://issues.apache.org/jira/browse/KAFKA-14094
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15219) Support delegation tokens in KRaft

2023-07-19 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-15219:

Description: 
Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft 
enabled the way to supporting them in KIP-900 by adding SCRAM support but 
delegation tokens still don't support KRaft.

There are multiple issues:
- TokenManager still would try to create tokens in Zookeeper. Instead of this 
we should forward admin requests to the controller that would store them in the 
metadata similarly to SCRAM. We probably won't need new protocols just 
enveloping similarly to other existing controller requests.
- TokenManager should run on Controller nodes only (or in mixed mode).
- Integration tests will need to be adapted as well and parameterize them with 
Zookeeper/KRaft.
- Documentation needs to be improved to factor in KRaft.

  was:
Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft 
enabled the way to supporting them in KIP-900 by adding SCRAM support but 
delegation tokens still don't support KRaft.

There are multiple issues:
- TokenManager still would try to create tokens in Zookeeper. Instead of this 
we should forward admin requests to the controller that would store them in the 
metadata similarly to SCRAM.
- TokenManager should run on Controller nodes only (or in mixed mode).
- Integration tests will need to be adapted as well and parameterize them with 
Zookeeper/KRaft.
- Documentation needs to be improved to factor in KRaft.


> Support delegation tokens in KRaft
> --
>
> Key: KAFKA-15219
> URL: https://issues.apache.org/jira/browse/KAFKA-15219
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Critical
>
> Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft 
> enabled the way to supporting them in KIP-900 by adding SCRAM support but 
> delegation tokens still don't support KRaft.
> There are multiple issues:
> - TokenManager still would try to create tokens in Zookeeper. Instead of this 
> we should forward admin requests to the controller that would store them in 
> the metadata similarly to SCRAM. We probably won't need new protocols just 
> enveloping similarly to other existing controller requests.
> - TokenManager should run on Controller nodes only (or in mixed mode).
> - Integration tests will need to be adapted as well and parameterize them 
> with Zookeeper/KRaft.
> - Documentation needs to be improved to factor in KRaft.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15219) Support delegation tokens in KRaft

2023-07-19 Thread Viktor Somogyi-Vass (Jira)
Viktor Somogyi-Vass created KAFKA-15219:
---

 Summary: Support delegation tokens in KRaft
 Key: KAFKA-15219
 URL: https://issues.apache.org/jira/browse/KAFKA-15219
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 3.6.0
Reporter: Viktor Somogyi-Vass
Assignee: Viktor Somogyi-Vass


Delegation tokens have been created in KIP-48 and improved in KIP-373. KRaft 
enabled the way to supporting them in KIP-900 by adding SCRAM support but 
delegation tokens still don't support KRaft.

There are multiple issues:
- TokenManager still would try to create tokens in Zookeeper. Instead of this 
we should forward admin requests to the controller that would store them in the 
metadata similarly to SCRAM.
- TokenManager should run on Controller nodes only (or in mixed mode).
- Integration tests will need to be adapted as well and parameterize them with 
Zookeeper/KRaft.
- Documentation needs to be improved to factor in KRaft.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-19 Thread via GitHub


nizhikov commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1268278161


##
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL)
+@Tag("integration")
+public class DeleteRecordsCommandTest {
+
+private final ClusterInstance cluster;
+public DeleteRecordsCommandTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testCommandZk() throws Exception {
+Properties adminProps = new Properties();
+
+adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1);
+
+try (Admin admin = cluster.createAdminClient(adminProps)) {
+assertThrows(
+AdminCommandFailedException.class,
+() -> DeleteRecordsCommand.execute0(admin, "{\"partitions\":[" 
+
+"{\"topic\":\"t\", \"partition\":0, \"offset\":1}," +
+"{\"topic\":\"t\", \"partition\":0, \"offset\":1}]" +
+"}", System.out),
+"Offset json file contains duplicate topic partitions: t-0"
+);
+
+admin.createTopics(Collections.singleton(new NewTopic("t", 1, 
(short) 1))).all().get();
+
+Properties props = new Properties();
+
+props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+
+try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
+producer.send(new ProducerRecord<>("t", "1")).get();
+producer.send(new ProducerRecord<>("t", "2")).get();
+producer.send(new ProducerRecord<>("t", "3")).get();
+}
+
+executeAndAssertOutput(
+"{\"partitions\":[{\"topic\":\"t\", \"partition\":0, 
\"offset\":1}]}",
+"partition: t-0\tlow_watermark: 1",
+admin
+);
+
+executeAndAssertOutput(
+"{\"partitions\":[{\"topic\":\"t\", \"partition\":42, 
\"offset\":42}]}",
+"partition: t-42\terror",
+admin
+);
+}
+}
+
+private static void executeAndAssertOutput(String json, String expOut, 
Admin admin) {
+String output =
+ToolsTestUtils.captureStandardOut(() -> 
DeleteRecordsCommand.execute0(admin, json, System.out));
+assertTrue(output.contains(expOut));
+}
+}
+
+/**
+ * Unit test of {@link DeleteRecordsCommand} tool.
+ */
+class DeleteRecordsCommandUnitTest {
+@Test
+public void 

[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-19 Thread via GitHub


nizhikov commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1268270865


##
tools/src/test/java/org/apache/kafka/tools/CoreUtilsTest.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+
+public class CoreUtilsTest {
+@Test
+public void testDuplicates() {
+assertIterableEquals(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-19 Thread via GitHub


nizhikov commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1268252459


##
tools/src/main/java/org/apache/kafka/tools/CoreUtils.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * General helper functions!
+ *
+ * This is for general helper functions that aren't specific to Kafka logic. 
Things that should have been included in
+ * the standard library etc.
+ *
+ * If you are making a new helper function and want to add it to this class 
please ensure the following:
+ * 1. It has documentation
+ * 2. It is the most general possible utility, not just the thing you needed 
in one particular place
+ * 3. You have tests for it if it is nontrivial in any way
+ */
+public class CoreUtils {

Review Comment:
   Renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1

2023-07-19 Thread via GitHub


mimaison commented on PR #13260:
URL: https://github.com/apache/kafka/pull/13260#issuecomment-1642274928

   My point is that this really contradicts the KIP. In the KIP we say "it does 
not work" but then we have a test that relies on the upgrade working. This is a 
bit counter intuitive. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13853: KAFKA-15088: Fixing Incorrect Reference Usage in Connector State Changes

2023-07-19 Thread via GitHub


C0urante commented on PR #13853:
URL: https://github.com/apache/kafka/pull/13853#issuecomment-1642271686

   The refactor that @gharris1727 is suggesting here would also be nice since 
it would allow us to establish states that are only applicable to `Connector` 
or `Task` instances. There's a use case for this that exists today: the 
`STOPPED` state will only ever be visible for `Connector` instances, and never 
for `Tasks`, so it doesn't really make sense to use the same state enum for the 
two.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14044: KAFKA-15216: InternalSinkRecord::newRecord should not ignore new headers

2023-07-19 Thread via GitHub


C0urante commented on PR #14044:
URL: https://github.com/apache/kafka/pull/14044#issuecomment-1642230322

   The change from `private` to `protected` technically counts as a change to 
public interface, so we'd need a KIP for that. I'm also a little hesitant to 
upgrade the visibility of these members regardless since that would limit the 
compatibility of plugins that rely on them (most likely by subclassing 
`ConnectRecord`, `SinkRecord`, etc.), since that would render them [binary 
incompatible](https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html#jls-13.4.7)
 with older versions of Connect where the fields were still private.
   
   Can we reduce the scope here to use fields instead of methods wherever 
possible, but without altering the visibility of any parts of our public API?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-19 Thread via GitHub


fvaleri commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1268183773


##
tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java:
##
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * A command for delete records of the given partitions down to the specified 
offset.
+ */
+public class DeleteRecordsCommand {
+private static final int EARLIEST_VERSION = 1;
+
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+private static final DecodeJson.DecodeLong LONG = new 
DecodeJson.DecodeLong();
+
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static void main(String[] args) throws Exception {
+execute(args, System.out);
+}
+
+static Collection> 
parseOffsetJsonStringWithoutDedup(String jsonData) throws 
JsonProcessingException {
+JsonValue js = Json.parseFull(jsonData)
+.orElseThrow(() -> new AdminOperationException("The input string 
is not a valid JSON"));
+
+Optional version = js.asJsonObject().get("version");
+
+return parseJsonData(version.isPresent() ? version.get().to(INT) : 
EARLIEST_VERSION, js);
+}
+
+private static Collection> parseJsonData(int 
version, JsonValue js) throws JsonMappingException {
+if (version == 1) {
+JsonValue partitions = js.asJsonObject().get("partitions")
+.orElseThrow(() -> new AdminOperationException("Missing 
partitions field"));
+
+Collection> res = new ArrayList<>();
+
+Iterator iterator = partitions.asJsonArray().iterator();
+
+while (iterator.hasNext()) {
+JsonObject partitionJs = iterator.next().asJsonObject();
+
+String topic = partitionJs.apply("topic").to(STRING);
+int partition = partitionJs.apply("partition").to(INT);
+long offset = partitionJs.apply("offset").to(LONG);
+
+res.add(new Tuple<>(new TopicPartition(topic, partition), 
offset));
+}
+
+return res;
+}
+
+throw new AdminOperationException("Not supported version field value " 
+ version);
+}
+
+public static void execute(String[] args, PrintStream out) throws 
IOException {
+DeleteRecordsCommandOptions opts = new 
DeleteRecordsCommandOptions(args);
+
+try (Admin adminClient = createAdminClient(opts)) {
+execute(adminClient, 
Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out);
+}
+}
+
+static void execute(Admin adminClient, String offsetJsonString, 
PrintStream out) throws JsonProcessingException {
+Collection> offsetSeq = 
parseOffsetJsonStringWithoutDedup(offsetJsonString);
+
+Set duplicatePartitions 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-19 Thread via GitHub


fvaleri commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1268183325


##
tools/src/main/java/org/apache/kafka/tools/CoreUtils.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * General helper functions!
+ *
+ * This is for general helper functions that aren't specific to Kafka logic. 
Things that should have been included in
+ * the standard library etc.
+ *
+ * If you are making a new helper function and want to add it to this class 
please ensure the following:
+ * 1. It has documentation
+ * 2. It is the most general possible utility, not just the thing you needed 
in one particular place
+ * 3. You have tests for it if it is nontrivial in any way
+ */
+public class CoreUtils {

Review Comment:
   We already have ToolsUtils in server-common, and maybe we should think about 
moving it to the tools module in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8128) Dynamic delegation token change possibility for consumer/producer

2023-07-19 Thread Viktor Somogyi-Vass (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744640#comment-17744640
 ] 

Viktor Somogyi-Vass commented on KAFKA-8128:


So after 4 years (yes) I think I'll pick this up. The problem came up more 
recently with OAuth but I believe the core problem here that JAAS contexts 
won't get reloaded. Making it a dynamic configuration would solve it but that 
probably requires a KIP too. I'll see what's up.
[~gsomogyi] does this seem correct? Did you have problems because you had to 
change the JAAS config or were there any other problems that you experienced?

> Dynamic delegation token change possibility for consumer/producer
> -
>
> Key: KAFKA-8128
> URL: https://issues.apache.org/jira/browse/KAFKA-8128
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.2.0
>Reporter: Gabor Somogyi
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Re-authentication feature on broker side is under implementation which will 
> enforce consumer/producer instances to re-authenticate time to time. It would 
> be good to set the latest delegation token dynamically and not re-creating 
> consumer/producer instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-8128) Dynamic delegation token change possibility for consumer/producer

2023-07-19 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass reassigned KAFKA-8128:
--

Assignee: Viktor Somogyi-Vass

> Dynamic delegation token change possibility for consumer/producer
> -
>
> Key: KAFKA-8128
> URL: https://issues.apache.org/jira/browse/KAFKA-8128
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.2.0
>Reporter: Gabor Somogyi
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Re-authentication feature on broker side is under implementation which will 
> enforce consumer/producer instances to re-authenticate time to time. It would 
> be good to set the latest delegation token dynamically and not re-creating 
> consumer/producer instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-19 Thread via GitHub


mimaison commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1268090551


##
tools/src/main/java/org/apache/kafka/tools/CoreUtils.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * General helper functions!
+ *
+ * This is for general helper functions that aren't specific to Kafka logic. 
Things that should have been included in
+ * the standard library etc.
+ *
+ * If you are making a new helper function and want to add it to this class 
please ensure the following:
+ * 1. It has documentation
+ * 2. It is the most general possible utility, not just the thing you needed 
in one particular place
+ * 3. You have tests for it if it is nontrivial in any way
+ */
+public class CoreUtils {

Review Comment:
   Maybe `ToolsUtils` would be a better name?



##
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL)
+@Tag("integration")
+public class DeleteRecordsCommandTest {
+
+private final ClusterInstance cluster;
+public DeleteRecordsCommandTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testCommandZk() throws Exception {
+Properties adminProps = new Properties();
+
+adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1);
+
+try (Admin admin = cluster.createAdminClient(adminProps)) {
+assertThrows(
+AdminCommandFailedException.class,
+() -> DeleteRecordsCommand.execute0(admin, "{\"partitions\":[" 
+
+"{\"topic\":\"t\", \"partition\":0, \"offset\":1}," +
+"{\"topic\":\"t\", \"partition\":0, \"offset\":1}]" +
+"}", System.out),
+"Offset json file contains 

[GitHub] [kafka] viktorsomogyi commented on pull request #13975: KAFKA-15161: Fix InvalidReplicationFactorException at connect startup

2023-07-19 Thread via GitHub


viktorsomogyi commented on PR #13975:
URL: https://github.com/apache/kafka/pull/13975#issuecomment-1642093548

   Captured two screen recordings to demonstrate the reproduction of the 
problem.
   This is the bug: 
https://drive.google.com/file/d/1Itq_Fv9bwNtRtPsD9KLUnD9I-6BX6Tki/view?usp=sharing
   This is how the fix would help: 
https://drive.google.com/file/d/10Otvj880EF--BOARmhULhvixwWsAhIt9/view?usp=sharing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov opened a new pull request, #14049: KAFKA-14038: Optimise calculation of size for log in remote tier

2023-07-19 Thread via GitHub


clolov opened a new pull request, #14049:
URL: https://github.com/apache/kafka/pull/14049

   ### Summary
   This pull request aims to address sub-points 1 and 2 from 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier#KIP852:Optimizecalculationofsizeforloginremotetier-Codechanges.
   
   Add a new method - remoteLogSize - to the interface RemoteLogMetadataManager.
   
   Implement remoteLogSize in TopicBasedRemoteLogMetadataManager.
   
   Extend TopicBasedRemoteLogMetadataManagerTest to cover two scenarios (size 
calculation of no remote log; size calculation of a remote log).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15129) Clean up all metrics that were forgotten to be closed

2023-07-19 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-15129:
---
Description: 
In the current kafka code, there are still many module metrics that are 
forgotten to be closed when they stop, although some of them have been fixed, 
such as kafka-14866 and kafka-14868. et.

These metric leaks may lead to potential OOM risks, and, in the unit tests and 
integration tests in the code, there are also a large number of `closes` 
without removing the metric, which will also cause CI test instability. By 
cleaning up these leaked indicators, these risks can be eliminated, and the 
security and stability of the code can be enhanced.

Here I will find all the metrics that are forgotten and closed in the current 
version, and submit them according to the subtasks in order to fix them.

  was:
In the current kafka code, there are still many module metrics that are 
forgotten to be closed when they stop, although some of them have been fixed, 
such as kafka-14866 and kafka-14868. et.
Here I will find all the metrics that are forgotten and closed in the current 
version, and submit them according to the subtasks in order to fix them.


> Clean up all metrics that were forgotten to be closed
> -
>
> Key: KAFKA-15129
> URL: https://issues.apache.org/jira/browse/KAFKA-15129
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core, log
>Affects Versions: 3.5.0
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>
> In the current kafka code, there are still many module metrics that are 
> forgotten to be closed when they stop, although some of them have been fixed, 
> such as kafka-14866 and kafka-14868. et.
> These metric leaks may lead to potential OOM risks, and, in the unit tests 
> and integration tests in the code, there are also a large number of `closes` 
> without removing the metric, which will also cause CI test instability. By 
> cleaning up these leaked indicators, these risks can be eliminated, and the 
> security and stability of the code can be enhanced.
> Here I will find all the metrics that are forgotten and closed in the current 
> version, and submit them according to the subtasks in order to fix them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] msn-tldr opened a new pull request, #14048: On metadata-update, mark paritions for immediate retry

2023-07-19 Thread via GitHub


msn-tldr opened a new pull request, #14048:
URL: https://github.com/apache/kafka/pull/14048

   Alternative POC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-19 Thread via GitHub


dajac commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1268008501


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord(
 );
 }
 
+/**
+ * Creates an OffsetCommit record.
+ *
+ * @param groupId   The group id.
+ * @param topic The topic name.
+ * @param partitionId   The partition id.
+ * @param offsetAndMetadata The offset and metadata.
+ * @param metadataVersion   The metadata version.
+ * @return The record.
+ */
+public static Record newOffsetCommitRecord(
+String groupId,
+String topic,
+int partitionId,
+OffsetAndMetadata offsetAndMetadata,
+MetadataVersion metadataVersion
+) {
+short version = offsetAndMetadata.expireTimestampMs.isPresent() ?
+(short) 1 : metadataVersion.offsetCommitValueVersion();

Review Comment:
   This corresponds to the logic 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1088).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-19 Thread via GitHub


dajac commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1268008967


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord(
 );
 }
 
+/**
+ * Creates an OffsetCommit record.
+ *
+ * @param groupId   The group id.
+ * @param topic The topic name.
+ * @param partitionId   The partition id.
+ * @param offsetAndMetadata The offset and metadata.
+ * @param metadataVersion   The metadata version.
+ * @return The record.
+ */
+public static Record newOffsetCommitRecord(
+String groupId,
+String topic,
+int partitionId,
+OffsetAndMetadata offsetAndMetadata,
+MetadataVersion metadataVersion
+) {
+short version = offsetAndMetadata.expireTimestampMs.isPresent() ?
+(short) 1 : metadataVersion.offsetCommitValueVersion();
+
+return new Record(
+new ApiMessageAndVersion(
+new OffsetCommitKey()
+.setGroup(groupId)
+.setTopic(topic)
+.setPartition(partitionId),
+(short) 1
+),
+new ApiMessageAndVersion(
+new OffsetCommitValue()
+.setOffset(offsetAndMetadata.offset)
+
.setLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+.setMetadata(offsetAndMetadata.metadata)
+.setCommitTimestamp(offsetAndMetadata.commitTimestampMs)
+// Version 1 has a non-empty expireTimestamp field
+
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)),

Review Comment:
   This comes from 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1094).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-19 Thread via GitHub


dajac commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1268008079


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -386,6 +386,18 @@ public short groupMetadataValueVersion() {
 }
 }
 
+public short offsetCommitValueVersion() {
+if (isLessThan(MetadataVersion.IBP_2_1_IV0)) {
+return 1;
+} else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) {
+return 2;
+} else {
+// Serialize with the highest supported non-flexible version
+// until a tagged field is introduced or the version is bumped.
+return  3;
+}

Review Comment:
   This is taken from 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1088).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-19 Thread via GitHub


dajac opened a new pull request, #14047:
URL: https://github.com/apache/kafka/pull/14047

   This patch does a few things:
   1) It introduces the `OffsetAndMetadata` class which hold the committed 
offsets in the group coordinator.
   2) It adds methods to deal with OffsetCommit records to `RecordHelpers`.
   3) It adds `MetadataVersion#offsetCommitValueVersion` to get the version of 
the OffsetCommit value record that should be used.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-19 Thread via GitHub


dajac opened a new pull request, #14046:
URL: https://github.com/apache/kafka/pull/14046

   This patch does a few things:
   1) It introduces version 9 of the OffsetCommit API. This new version has no 
schema changes but it can return a StaleMemberEpochException if the new 
consumer group protocol is used.
   2) It renames the `generationId` field in the request to 
`GenerationIdOrMemberEpoch`. This is backward compatible change.
   3) It introduces the new StaleMemberEpochException error.
   4) It does a minor refactoring in OffsetCommitRequest class.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Nickstery commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


Nickstery commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1267948092


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   Hi, thank you for explanation, sounds reasonable.
   
   Sorry for being too questionable. One more point:
   What could happen if we instead:
   ```
   while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset)
   
   if (highestLogOffset == null || leaderEpochEndOffset > highestLogOffset) {
   highestLogOffset = leaderEpochEndOffset;
   }
   ```
   will do
   ```
   while (lastEntry != null && lastEntry.getKey() >= startOffset)
   
   highestLogOffset = leaderEpochEndOffset;
   ```
   
   How I see it:
   - In case we sent twice same segment [somehow], It will be replaced with the 
same one
   - In case unclean.leader.election set to true, we are going to have up to 
date data, in that case TS will work same as local storage function wise [data 
loss due to dead leader]. Less discrepancy - easier to maintain and understand
   - I do believe we do not send from leader and all the followers same 
segments. The leader is someone who sends data to tiered storage, so we expect 
at the same time send only 1 segment for partition-X, in case second  request 
comes for same partition, it means that new leader is elected and it is in 
charge and its data is more valuable.
   - It is possible that even ISR won't be synced fully 
`replica.lag.time.max.ms` [default= 30s] allows that, it means that it is 
possible that few messages wont be synced and potentially deleted in case 
traffic stops coming few messages after Leader broker died and new leader got 
them. What if Broker 1 uploaded segment [0, 100], Broker-2 was insync by 
replicated [0,90]. Broker-1 died, Broker-2 elected as a Leader and got 5 
messages, Tiered storage won't be in charge of those 5 messages and segment 
could be deleted. After traffic being restored new segment is going to roll and 
its offset starts with 96, when last offset in TS is 100, so we delete segment 
and push new one starting from 96, and we could have a gap afterwards. 
   
   Of course this example is an edge case and configuration should be quite 
agressive for that in terms for retention times etc. But are the any drawbacks 
by removing `highestLogOffset <= leaderEpochEndOffset` from the while? What 
kind of problem we can face and why having this check is more safe rather than 
remove it?
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Nickstery commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


Nickstery commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1267954353


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   Test example
   ```
   @Test
   void 
handleSegmentWithCopySegmentFinishedStateForDuplicateSegmentsWithSecondSegmentEarlierEndOffset()
 {
   // Broker-1 is a Leader for Partition-0, Broker-2 is follower
   RemoteLogSegmentId segmentId1BRoker1 = new RemoteLogSegmentId(tpId, 
Uuid.randomUuid());
  
   //Leader and follower are in sync
   epochState.handleSegmentWithCopySegmentFinishedState(10L, 
segmentId1BRoker1, 100L);
   
   //Broker-1 received some messages and part of them were replicated 
by follower
   // Broker-2 was not able to replicate all the messages because 
Broker-1 is died. 
   // But it was able to upload segment before death [101, 200]
   RemoteLogSegmentId segmentId2Broker1 = new RemoteLogSegmentId(tpId, 
Uuid.randomUuid());
   epochState.handleSegmentWithCopySegmentFinishedState(101L, 
segmentId2Broker1, 200L);
   
   // Broker-2 still in-sync according to `replica.lag.time.max.ms`
   // Last offset in sync is 150L
   // Broker-2 gets the leadership and fills the segment 101L-190L and 
uploads to Tiered storage
   RemoteLogSegmentId segmentId2Broker2 = new RemoteLogSegmentId(tpId, 
Uuid.randomUuid());
   epochState.handleSegmentWithCopySegmentFinishedState(101L, 
segmentId2Broker2, 190L);
   
   //Traffic stops
   
   //Since LeaderEpoch was changed, I expect that more fresh data will 
be on the TS
   assertEquals(190L, epochState.highestLogOffset());
   //segment 2 uploaded by Broker-1 is expected to be unreferenced
   
assertTrue(epochState.unreferencedSegmentIds().contains(segmentId2Broker1));
   assertEquals(1, epochState.unreferencedSegmentIds().size());
   }
   ```
   Than it could be part of integration test, we can upload new segment 
[191L, 300L] and consume from the beginning from TS. With current logic we have 
data loss from segment 2 where BROKER-2 was leader [151L,190L], but in case of 
Kafka local storage functionality, it would not happen



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Nickstery commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


Nickstery commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1267954353


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   Test example
   ```
   @Test
   void 
handleSegmentWithCopySegmentFinishedStateForDuplicateSegmentsWithSecondSegmentEarlierEndOffset()
 {
   // Broker-1 is a Leader for Partition-0, Broker-2 is follower
   RemoteLogSegmentId segmentId1BRoker1 = new RemoteLogSegmentId(tpId, 
Uuid.randomUuid());
  
   //Leader and follower are in sync
   epochState.handleSegmentWithCopySegmentFinishedState(10L, 
segmentId1BRoker1, 100L);
   
   //Broker-1 received some messages and part of them were replicated 
by follower
   // Broker-2 was not able to replicate all the messages because 
Broker-1 is died. 
   // But it was able to upload segment before death [101, 200]
   RemoteLogSegmentId segmentId2Broker1 = new RemoteLogSegmentId(tpId, 
Uuid.randomUuid());
   epochState.handleSegmentWithCopySegmentFinishedState(101L, 
segmentId2Broker1, 200L);
   
   // Broker-2 still in-sync according to `replica.lag.time.max.ms`
   // Last offset in sync is 150L
   // Broker-2 gets the leadership and fills the segment 101L-190L and 
uploads to Tiered storage
   RemoteLogSegmentId segmentId2Broker2 = new RemoteLogSegmentId(tpId, 
Uuid.randomUuid());
   epochState.handleSegmentWithCopySegmentFinishedState(101L, 
segmentId2Broker2, 190L);
   
   //Traffic stops
   
   //Since LeaderEpoch was changed, I expect that more fresh data will 
be on the TS
   assertEquals(190L, epochState.highestLogOffset());
   //segment 2 uploaded by Broker-1 is expected to be unreferenced
   
assertTrue(epochState.unreferencedSegmentIds().contains(segmentId2Broker1));
   assertEquals(1, epochState.unreferencedSegmentIds().size());
   }```
   
   Than it could be part of integration test, we can upload new segment [191L, 
300L] and consume from the beginning from TS. With current logic we have data 
loss from segment 2 where BROKER-2 was leader [151L,190L], but in case of Kafka 
local storage functionality, it would not happen



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Nickstery commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


Nickstery commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1267954353


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   Test example
   ```
   @Test
   void 
handleSegmentWithCopySegmentFinishedStateForDuplicateSegmentsWithSecondSegmentEarlierEndOffset()
 {
   // Broker-1 is a Leader for Partition-0, Broker-2 is follower
   RemoteLogSegmentId segmentId1BRoker1 = new RemoteLogSegmentId(tpId, 
Uuid.randomUuid());
  
   //Leader and follower are in sync
   epochState.handleSegmentWithCopySegmentFinishedState(10L, 
segmentId1BRoker1, 100L);
   
   //Broker-1 received some messages and part of them were replicated 
by follower
   // Broker-2 was not able to replicate all the messages because 
Broker-1 is died. 
   // But it was able to upload segment before death [101, 200]
   RemoteLogSegmentId segmentId2Broker1 = new RemoteLogSegmentId(tpId, 
Uuid.randomUuid());
   epochState.handleSegmentWithCopySegmentFinishedState(101L, 
segmentId2Broker1, 200L);
   
   // Broker-2 still in-sync according to `replica.lag.time.max.ms`
   // Last offset in sync is 150L
   // Broker-2 gets the leadership and fills the segment 101L-190L and 
uploads to Tiered storage
   RemoteLogSegmentId segmentId2Broker2 = new RemoteLogSegmentId(tpId, 
Uuid.randomUuid());
   epochState.handleSegmentWithCopySegmentFinishedState(101L, 
segmentId2Broker2, 190L);
   
   //Traffic stops
   
   //Since LeaderEpoch was changed, I expect that more fresh data will 
be on the TS
   assertEquals(190L, epochState.highestLogOffset());
   //segment 2 uploaded by Broker-1 is expected to be unreferenced
   
assertTrue(epochState.unreferencedSegmentIds().contains(segmentId2Broker1));
   assertEquals(1, epochState.unreferencedSegmentIds().size());
   }
   
   Than it could be part of integration test, we can upload new segment 
[191L, 300L] and consume from the beginning



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1

2023-07-19 Thread via GitHub


clolov commented on PR #13260:
URL: https://github.com/apache/kafka/pull/13260#issuecomment-1641957263

   Hey @forlack! Thank you for the suggestion, I have upgraded the dependency 
so that tests can run.
   
   Heya @mimaison, I have addressed your comment about adding the latest 
version to the LICENSE-binary file and I would be grateful for any last review 
as this has been open for sometime.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >