Re: [PR] MINOR: Ensure that DisplayName is set in all parameterized tests [kafka]

2023-12-04 Thread via GitHub


dajac merged PR #14850:
URL: https://github.com/apache/kafka/pull/14850


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15061) CoordinatorPartitionWriter should reuse buffer

2023-12-04 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15061.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> CoordinatorPartitionWriter should reuse buffer
> --
>
> Key: KAFKA-15061
> URL: https://issues.apache.org/jira/browse/KAFKA-15061
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>  Labels: kip-848-preview
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]

2023-12-04 Thread via GitHub


dajac merged PR #14885:
URL: https://github.com/apache/kafka/pull/14885


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add @Timeout annotation to consumer integration tests [kafka]

2023-12-04 Thread via GitHub


dajac merged PR #14896:
URL: https://github.com/apache/kafka/pull/14896


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable auto-tick in AsyncKafkaConsumerTest [kafka]

2023-12-04 Thread via GitHub


dajac merged PR #14915:
URL: https://github.com/apache/kafka/pull/14915


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15972: Add support to exclude labels for telemetry metrics (KIP-714) [kafka]

2023-12-04 Thread via GitHub


apoorvmittal10 commented on PR #14924:
URL: https://github.com/apache/kafka/pull/14924#issuecomment-1840118739

   @AndrewJSchofield @mjsax @wcarlson5 @philipnee - Minor PR to exclude client 
`client_id` label in metrics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15972: Add support to exclude labels for telemetry metrics (KIP-714) [kafka]

2023-12-04 Thread via GitHub


apoorvmittal10 opened a new pull request, #14924:
URL: https://github.com/apache/kafka/pull/14924

   Changes in the PR are to support excluding `client_id` label when sending 
telemetry metrics.
   
   Some of the labels/tags which are present in metric should be skipped while 
collecting telemetry as data might already be known to broker hence, we should 
minimize the data transfer. One of such labels is client_id which is already 
present in RequestContext hence broker can append that label prior emitting 
metrics to telemetry backend.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15972) Add support to exclude labels in client telemetry

2023-12-04 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15972:
-

 Summary: Add support to exclude labels in client telemetry
 Key: KAFKA-15972
 URL: https://issues.apache.org/jira/browse/KAFKA-15972
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal


Some of the labels/tags which are present in metric should be skipped while 
collecting telemetry as data might already be known to broker hence, we should 
minimize the data transfer. One of such labels is client_id which is already 
present in RequestContext hence broker can append that label prior emitting 
metrics to telemetry backend. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: fix bug where we weren't registering SnapshotEmitterMetrics [kafka]

2023-12-04 Thread via GitHub


cmccabe merged PR #14918:
URL: https://github.com/apache/kafka/pull/14918


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9545: Fix Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted` [kafka]

2023-12-04 Thread via GitHub


ashwinpankaj commented on PR #14910:
URL: https://github.com/apache/kafka/pull/14910#issuecomment-1840012911

   > LGTM
   
   Thanks @wcarlson5 !! - can you please merge this commit ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-04 Thread via GitHub


pprovenzano commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1414875644


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2016,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {

Review Comment:
   We should check if the Metadata version supports this request and return an 
error if not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15866:Refactor OffsetFetchRequestState Error handling [kafka]

2023-12-04 Thread via GitHub


DL1231 opened a new pull request, #14923:
URL: https://github.com/apache/kafka/pull/14923

   The PR resolve issue 
[KAFKA-15866](https://issues.apache.org/jira/browse/KAFKA-15866), the current 
OffsetFetchRequestState error handling uses nested if-else, which is quite 
different, stylistically, to the OffsetCommitRequestState using a switch 
statment.  The latter is a bit more readable so we should refactor the error 
handling using the same style to improve readability.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Explain need to escape backslashes in rules [kafka]

2023-12-04 Thread via GitHub


github-actions[bot] commented on PR #14333:
URL: https://github.com/apache/kafka/pull/14333#issuecomment-1839944197

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState

2023-12-04 Thread Lan Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lan Ding reassigned KAFKA-15866:


Assignee: Lan Ding

> Refactor OffsetFetchRequestState Error handling to be more consistent with 
> OffsetCommitRequestState
> ---
>
> Key: KAFKA-15866
> URL: https://issues.apache.org/jira/browse/KAFKA-15866
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Assignee: Lan Ding
>Priority: Minor
>
> The current OffsetFetchRequestState error handling uses nested if-else, which 
> is quite different, stylistically, to the OffsetCommitRequestState using a 
> switch statment.  The latter is a bit more readable so we should refactor the 
> error handling using the same style to improve readability.
>  
> A minor point: Some of the error handling seems inconsistent with the commit. 
> The logic was from the current implementation, so we should also review all 
> the error handling.  For example: somehow the current logic doesn't mark the 
> coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15883: Implement RemoteCopyLagBytes [kafka]

2023-12-04 Thread via GitHub


showuon commented on code in PR #14832:
URL: https://github.com/apache/kafka/pull/14832#discussion_r1414765630


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -283,8 +283,38 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
   meter()
   }
 
+  case class GaugeWrapper(metricType: String, brokerTopicAggregatedMetric: 
BrokerTopicAggregatedMetric) {
+@volatile private var lazyGauge: Gauge[Long] = _
+private val gaugeLock = new Object
+
+def gauge(): Gauge[Long] = {
+  var gauge = lazyGauge
+  if (gauge == null) {
+gaugeLock synchronized {
+  gauge = lazyGauge
+  if (gauge == null) {
+gauge = metricsGroup.newGauge(metricType, () => 
brokerTopicAggregatedMetric.value())
+lazyGauge = gauge
+  }
+}
+  }
+  gauge
+}
+
+def close(): Unit = gaugeLock synchronized {
+  if (lazyGauge != null) {
+metricsGroup.removeMetric(metricType)
+brokerTopicAggregatedMetric.close()
+lazyGauge = null
+  }
+}
+
+gauge() // greedily initialize the general topic metrics

Review Comment:
   nit: I don't think we need comment here.



##
core/src/test/scala/integration/kafka/api/MetricsTest.scala:
##
@@ -320,16 +320,28 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
 assertTrue(metrics.isEmpty, s"$errorMessage: ${metrics.keys}")
   }
 
+  private def fromNameToBrokerTopicStatsMBean(name: String): String = {
+s"kafka.server:type=BrokerTopicMetrics,name=$name"
+  }
+
   private def verifyRemoteStorageMetrics(shouldContainMetrics: Boolean): Unit 
= {
 val metrics = RemoteStorageMetrics.allMetrics().asScala.filter(name =>
   KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.find(metric => {
 metric._1.getMBeanName().equals(name.getMBeanName)
   }).isDefined
 ).toList
+val aggregatedBrokerTopicStats = Set(BrokerTopicStats.RemoteCopyLagBytes)

Review Comment:
   Why using set here? I guess we plan to add more metrics in this set in the 
future, is that right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]

2023-12-04 Thread via GitHub


apoorvmittal10 commented on PR #14843:
URL: https://github.com/apache/kafka/pull/14843#issuecomment-1839895007

   @mjsax Build passed with known flaky tests, not related to PR changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-04 Thread via GitHub


mjsax opened a new pull request, #14922:
URL: https://github.com/apache/kafka/pull/14922

   This PR is on top of https://github.com/apache/kafka/pull/14908
   
   Only second commit to be reviewed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-04 Thread via GitHub


kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1414752935


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -183,6 +183,10 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
 }
 
+public MembershipManager membershipManager() {
+return membershipManager;
+}

Review Comment:
   It's for access by the `ApplicationEventProcessor` to process its events 
that relate to the state machine.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##
@@ -66,15 +66,15 @@ Exception invokePartitionsAssigned(final 
SortedSet assignedParti
 throw e;
 } catch (Exception e) {
 log.error("User provided listener {} failed on invocation of 
onPartitionsAssigned for partitions {}",
-listener.getClass().getName(), assignedPartitions, e);
+listener.get().getClass().getName(), 
assignedPartitions, e);

Review Comment:
   Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-04 Thread via GitHub


kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1414752641


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1376,6 +1425,66 @@ private void subscribeInternal(Collection 
topics, Optional
+ * Process background events, if any
+ * Briefly wait for {@link CompletableApplicationEvent an 
event} to complete
+ * 
+ *
+ * 
+ *
+ * Each iteration gives the application thread an opportunity to process 
background events, which may be
+ * necessary to complete the overall processing.
+ *
+ * 
+ *
+ * As an example, take {@link #unsubscribe()}. To start unsubscribing, the 
application thread enqueues an
+ * {@link UnsubscribeApplicationEvent} on the application event queue. 
That event will eventually trigger the
+ * rebalancing logic in the background thread. Critically, as part of this 
rebalancing work, the
+ * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} 
callback needs to be invoked. However,
+ * this callback must be executed on the application thread. To achieve 
this, the background thread enqueues a
+ * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background 
event queue. That event queue is
+ * periodically queried by the application thread to see if there's work 
to be done. When the application thread
+ * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is 
processed, and then a
+ * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then 
enqueued by the application thread on the
+ * background event queue. Moments later, the background thread will see 
that event, process it, and continue
+ * execution of the rebalancing logic. The rebalancing logic cannot 
complete until the
+ * {@link ConsumerRebalanceListener} callback is performed.
+ *
+ * @param event Event that contains a {@link CompletableFuture}; it is on 
this future that the application thread
+ *  will wait for completion
+ * @param timer Overall timer that bounds how long the application thread 
will wait for the event to complete
+ * @return {@code true} if the event completed within the timeout, {@code 
false} otherwise
+ */
+private boolean processBackgroundEvents(CompletableApplicationEvent 
event, Timer timer) {
+log.trace("Enqueuing event {} for processing; will wait up to {} ms to 
complete", event, timer.remainingMs());
+
+do {
+backgroundEventProcessor.process();

Review Comment:
   The `process()` method on `BackgroundEventProcessor` (and 
`ApplicationEventProcessor`) process _all_ the events in the queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


soarez commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839871090

   @cmccabe thanks for having a look. 
   
   I've made the following changes:
   
   * Replaced the use of `prepend` with `append` in `propagateDirectoryFailure`
   * Moved `communicationInFlight = true` after `_channelManager.sendRequest()`
   * Always set `nextSchedulingShouldBeImmediate = false` in 
`scheduleNextCommunicationAfterFailure`
   * Moved checking if `communicationInFlight = true` to 
`CommunicationEvent.run()`
   
   @junrao @cmccabe: please take another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] improve TopicCommandIntegrationTest to be less flaky [kafka]

2023-12-04 Thread via GitHub


Owen-CH-Leung commented on PR #14891:
URL: https://github.com/apache/kafka/pull/14891#issuecomment-1839853774

   > Hm.. given this is the TopicCommandIntegrationTest I think we should 
keep the parts testing the topic command tool.
   
   Thanks for your feedback! Do you mean we should keep the existing API calls 
like `createAndWaitTopic(buildTopicCommandOptionsWithBootstrap(
   "--create", "--partitions", "2", "--replication-factor", "1", 
"--topic", testTopicName));` ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414728861


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest {
 result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-while (!future.isDone || context.mockClient.hasInFlightRequests) {
-  context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+  if (manager.eventQueue.isEmpty)

Review Comment:
   You're right, this is incorrect. We must only advance the time if the 
eventQueue has an event scheduled at a future time, unless the next event is a 
timeout event. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-12-04 Thread via GitHub


lihaosky commented on code in PR #14714:
URL: https://github.com/apache/kafka/pull/14714#discussion_r1414713123


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:
##
@@ -394,6 +394,106 @@ public void testDeterministic() {
 }
 }
 
+@Test
+public void testMaxFlowOnlySourceAndSink() {
+final Graph graph1 = new Graph<>();

Review Comment:
   `graph` was used in setup



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java:
##
@@ -88,55 +93,20 @@ public Graph constructTaskGraph(
 }
 }
 
-// TODO: validate tasks in tasksForTopicGroup and taskIdList
-final SortedMap> sortedTasksForTopicGroup = 
new TreeMap<>(tasksForTopicGroup);

Review Comment:
   It's put in `validateTasks`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1414713239


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -349,146 +468,68 @@ class GroupMetadataManager(brokerId: Int,
consumerId: String,
offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors] 
=> Unit,
-   transactionalId: String = null,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
-// first filter out partitions with offset metadata size exceeding limit
-val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
-  validateOffsetMetadataLength(offsetAndMetadata.metadata)
-}
-
 group.inLock {
   if (!group.hasReceivedConsistentOffsetCommits)
 warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has 
received offset commits from consumers as well " +
   s"as transactional producers. Mixing both types of offset commits 
will generally result in surprises and " +
   s"should be avoided.")
 }
 
-val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
-// construct the message set to append
+val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
+  validateOffsetMetadataLength(offsetAndMetadata.metadata)
+}
 if (filteredOffsetMetadata.isEmpty) {
   // compute the final error codes for the commit response
   val commitStatus = offsetMetadata.map { case (k, _) => k -> 
Errors.OFFSET_METADATA_TOO_LARGE }
   responseCallback(commitStatus)
-} else {
-  getMagic(partitionFor(group.groupId)) match {
-case Some(magicValue) =>
-  // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
-  val timestampType = TimestampType.CREATE_TIME
-  val timestamp = time.milliseconds()
-
-  val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-val key = GroupMetadataManager.offsetCommitKey(group.groupId, 
topicIdPartition.topicPartition)
-val value = 
GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
-new SimpleRecord(timestamp, key, value)
-  }
-  val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
-  val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
-
-  if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
-throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting 
to make a transaction offset commit with an invalid magic: " + magicValue)
-
-  val builder = MemoryRecords.builder(buffer, magicValue, 
compressionType, timestampType, 0L, time.milliseconds(),
-producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
-
-  records.foreach(builder.append)
-  val entries = Map(offsetTopicPartition -> builder.build())
-
-  // set the callback function to insert offsets into cache after log 
append completed
-  def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
-// the append response should only contain the topics partition
-if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
-  throw new IllegalStateException("Append status %s should only 
have one partition %s"
-.format(responseStatus, offsetTopicPartition))
-
-// construct the commit response status and insert
-// the offset and metadata to cache if the append status has no 
error
-val status = responseStatus(offsetTopicPartition)
-
-val responseError = group.inLock {
-  if (status.error == Errors.NONE) {
-if (!group.is(Dead)) {
-  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
-if (isTxnOffsetCommit)
-  group.onTxnOffsetCommitAppend(producerId, 
topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), 
offsetAndMetadata))
-else
-  group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
-  }
-}
-
-// Record the number of offsets committed to the log
-offsetCommitsSensor.record(records.size)
-
-   

Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1414713000


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##
@@ -347,7 +347,7 @@ private void maybePublishMetadata(MetadataDelta delta, 
MetadataImage image, Load
 }
 metrics.updateLastAppliedImageProvenance(image.provenance());
 metrics.setCurrentMetadataVersion(image.features().metadataVersion());
-if (uninitializedPublishers.isEmpty()) {
+if (!uninitializedPublishers.isEmpty()) {

Review Comment:
   Sigh. Looks like this was introduced in KAFKA-14538, which I reviewed. It 
really only bites in scenarios where you're running a pre-NO_OP record MV, 
which is rare these days.
   
   Thanks for the fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]

2023-12-04 Thread via GitHub


gharris1727 commented on PR #14917:
URL: https://github.com/apache/kafka/pull/14917#issuecomment-1839818405

   I tested out adding a MiniTrogdorCluster start/stop in a `@BeforeAll` method 
in the test, and I'm getting reliable test passes at about 20% CPU, where 
before it was consistent failures.
   It appears that the first start of MiniTrogdorCluster takes ~110 seconds, 
while subsequent starts take ~2 seconds. That would explain the trend I was 
seeing in gradle enterprise, where one test has >30s runtime, while every other 
test has ~2s runtime.
   
   I would probably recommend increasing the timeout in this test to 4 minutes, 
or adding in the `@BeforeAll` warm-up method I mentioned, in lieu of disabling 
this test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414711941


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -366,8 +379,27 @@ class BrokerLifecycleManager(
   new BrokerRegistrationResponseHandler())
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
+}
+
+override def onTimeout(): Unit = {
+  info(s"Unable to register the broker because the RPC got timed out 
before it could be sent.")
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
+}
+  }
+
+  private class BrokerRegistrationResponseEvent(response: ClientResponse, 
timedOut: Boolean) extends EventQueue.Event {
+override def run(): Unit = {
+  communicationInFlight = false

Review Comment:
   Ok. I wasn't sure that communicationInFlight is for both requests.
   
   Since `NodeToControllerChannelManagerImpl` uses 1 for 
`maxInFlightRequestsPerConnection`, another option is to add a `canSendRequest` 
api in `NodeToControllerChannelManager`, instead maintaining 
`communicationInFlight` here. 
   
   `canSendRequest` api can be implemented through `networkClient.ready(node, 
now)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1414710963


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -354,6 +356,25 @@ public ControllerResult 
registerBroker(
 throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
 "brokers until the metadata migration is complete.");
 }
+
+if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+Set set = new HashSet<>(request.logDirs());
+if (set.stream().anyMatch(DirectoryId::reserved)) {
+throw new DuplicateBrokerRegistrationException("Reserved 
directory ID in request");

Review Comment:
   Hmm. `DuplicateBrokerRegistrationException` is certainly the wrong thing, 
but I don't think `BrokerIdNotRegisteredException` is quite right either. That 
exception was intended for when you made an RPC that required broker N to be 
registered (like heartbeat) but broker N was not in fact registered. It wasn't 
intended for an error returned from the registration RPC itself. After all, it 
provides no information. OK the broker is not registered, but why?
   
   I can see that there are some lines above which are using 
`BrokerIdNotRegisteredException` as a generic "didn't register" error, but that 
doesn't seem quite right...
   ```
   if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) {
   throw new BrokerIdNotRegisteredException("Controller does not 
support registering ZK brokers.");
   }
   
   if (!request.isMigratingZkBroker() && 
featureControl.inPreMigrationMode()) {
   throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
   "brokers until the metadata migration is complete.");
   }
   ```
   
   The "pre-migration" thing is just a straightforward logic error that should 
never happen. Maybe `RuntimeException` (and consequent controller failover) is 
OK for that.
   
   The `request.isMigratingZkBroker()` check probably deserves its own error 
code since it's a weird scenario that should have been given its own error code 
by the migration KIP. Not sure if it's worth doing now but... probably
   
   Your thing is more like INVALID_REQUEST, since someone is sending obvious 
nonsense which superficially resembles the schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1414710963


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -354,6 +356,25 @@ public ControllerResult 
registerBroker(
 throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
 "brokers until the metadata migration is complete.");
 }
+
+if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+Set set = new HashSet<>(request.logDirs());
+if (set.stream().anyMatch(DirectoryId::reserved)) {
+throw new DuplicateBrokerRegistrationException("Reserved 
directory ID in request");

Review Comment:
   Hmm. `DuplicateBrokerRegistrationException` is certainly the wrong thing, 
but I don't think `BrokerIdNotRegisteredException` is quite right either. That 
exception was intended for when you made an RPC that required broker N to be 
registered (like heartbeat) but broker N was not in fact registered. It wasn't 
intended for an error returned from the registration RPC itself. After all, it 
provides no information. OK the broker is not registered, but why?
   
   I can see that there are some lines above which are using 
`BrokerIdNotRegisteredException` as a generic "didn't register" error, but that 
doesn't seem quite right...
   ```
   if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) {
   throw new BrokerIdNotRegisteredException("Controller does not 
support registering ZK brokers.");
   }
   
   if (!request.isMigratingZkBroker() && 
featureControl.inPreMigrationMode()) {
   throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
   "brokers until the metadata migration is complete.");
   }
   ```
   
   The "pre-migration" thing is just a straightforward logic error that should 
never happen. The `request.isMigratingZkBroker()` check probably deserves its 
own error code since it's a weird scenario that should have been spelled out in 
the KIP.
   
   Your thing is more like INVALID_REQUEST, since someone is sending obvious 
nonsense which superficially resembles the schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1414710936


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -941,39 +857,129 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
-  private def partitionEntriesForVerification(verificationGuards: 
mutable.Map[TopicPartition, VerificationGuard],
-  entriesPerPartition: 
Map[TopicPartition, MemoryRecords],
-  verifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-  unverifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-  errorEntries: 
mutable.Map[TopicPartition, Errors]): Unit= {
+  private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, 
MemoryRecords],
+ responseCallback: 
Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+// If required.acks is outside accepted range, something is wrong with the 
client
+// Just return an error and don't handle the request at all
+val responseStatus = entries.map { case (topicPartition, _) =>
+  topicPartition -> new PartitionResponse(
+Errors.INVALID_REQUIRED_ACKS,
+LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
+RecordBatch.NO_TIMESTAMP,
+LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
+  )
+}
+responseCallback(responseStatus)
+  }
+
+  /**
+   * Apply the postVerificationCallback asynchronously only after verifying 
the partitions have been added to the transaction.
+   * The postVerificationCallback takes the arguments of the requestLocal for 
the thread that will be doing the append as
+   * well as a mapping of topic partitions to LogAppendResult for the 
partitions that saw errors when verifying.
+   *
+   * This method will start the verification process for all the 
topicPartitions in entriesPerPartition and supply the
+   * postVerificationCallback to be run on a request handler thread when the 
response is received.
+   *
+   * @param entriesPerPartitionthe records per partition to be 
appended and therefore need verification
+   * @param transactionVerificationEntries the object that will store the 
entries to verify, the errors, and the verification guards
+   * @param transactionalIdthe id for the transaction
+   * @param requestLocal   container for the stateful 
instances scoped to this request -- this must correspond to the
+   *   thread calling this method
+   * @param postVerificationCallback   the method to be called when 
verification completes and the verification errors
+   *   and the thread's RequestLocal are 
supplied
+   */
+  def appendRecordsWithTransactionVerification(entriesPerPartition: 
Map[TopicPartition, MemoryRecords],
+   transactionVerificationEntries: 
TransactionVerificationEntries,
+   transactionalId: String,
+   requestLocal: RequestLocal,
+   postVerificationCallback: 
RequestLocal => Map[TopicPartition, LogAppendResult] => Unit): Unit = {
+if (transactionalId != null && 
config.transactionPartitionVerificationEnable && 
addPartitionsToTxnManager.isDefined)
+  partitionEntriesForVerification(transactionVerificationEntries, 
entriesPerPartition)
+
+val onVerificationComplete: (RequestLocal, Map[TopicPartition, Errors]) => 
Unit =
+  executePostVerificationCallback(
+transactionVerificationEntries,
+postVerificationCallback,
+  )
+
+if (transactionVerificationEntries.unverified.isEmpty) {
+  onVerificationComplete(requestLocal, 
transactionVerificationEntries.errors.toMap)
+} else {
+  // For unverified entries, send a request to verify. When verified, the 
append process will proceed via the callback.
+  // We verify above that all partitions use the same producer ID.
+  val batchInfo = 
transactionVerificationEntries.unverified.head._2.firstBatch()
+  addPartitionsToTxnManager.foreach(_.verifyTransaction(
+transactionalId = transactionalId,
+producerId = batchInfo.producerId,
+producerEpoch = batchInfo.producerEpoch,
+topicPartitions = 
transactionVerificationEntries.unverified.keySet.toSeq,
+callback = 
KafkaRequestHandler.wrapAsyncCallback(onVerificationComplete, requestLocal)
+  ))
+}
+  }
+
+  /**
+   * A helper method to compile the results from the transaction verification 
and call the postVerificationCallback.
+   *
+   * @param transactionVerificationEntries the object that will store the 
entries to verify, the errors, and the verification guards

Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1414708109


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -708,23 +708,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
-if (authorizedRequestInfo.isEmpty)
-  sendResponseCallback(Map.empty)
-else {
-  val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
+val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
+val transactionVerificationEntries = new 
ReplicaManager.TransactionVerificationEntries
 
-  // call the replica manager to append messages to the replicas
+def postVerificationCallback(newRequestLocal: RequestLocal)

Review Comment:
   The problem I had was that I needed to pass the errors and the verification 
guards into the postVerificationCallback, but wanted to minimize methods with a 
ton of arguments that do similar things. The naming starts to get confusing as 
well. An alternative I can think of is to create another method in 
ReplicaManager for the transaction produce path, but it will have all the 
append arguments and just call this same callback under the hood. 
   
   I'm not sure I fully agree that the code was good before. I think it is 
better to keep append records clean and not incorporate transaction 
verification. That's why the transaction verification has the pre-verify (add 
to the txn manager) and post verification (tidy the results and put them in a 
form for the post verification callback).
   
   If we want to move this code out of KafkaApis, I think it only makes sense 
to make a separate method. The other stages need to remain generic for the 
group coordinator usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414708239


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -366,8 +379,27 @@ class BrokerLifecycleManager(
   new BrokerRegistrationResponseHandler())
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
+}
+
+override def onTimeout(): Unit = {
+  info(s"Unable to register the broker because the RPC got timed out 
before it could be sent.")
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
+}
+  }
+
+  private class BrokerRegistrationResponseEvent(response: ClientResponse, 
timedOut: Boolean) extends EventQueue.Event {
+override def run(): Unit = {
+  communicationInFlight = false

Review Comment:
   Since `NodeToControllerChannelManagerImpl` uses 1 for 
`maxInFlightRequestsPerConnection`, we could also add a `sendRequest` api in 
`NodeToControllerChannelManager`, instead 
   
   networkClient.ready(node, now)



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -366,8 +379,27 @@ class BrokerLifecycleManager(
   new BrokerRegistrationResponseHandler())
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
+}
+
+override def onTimeout(): Unit = {
+  info(s"Unable to register the broker because the RPC got timed out 
before it could be sent.")
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
+}
+  }
+
+  private class BrokerRegistrationResponseEvent(response: ClientResponse, 
timedOut: Boolean) extends EventQueue.Event {
+override def run(): Unit = {
+  communicationInFlight = false

Review Comment:
   Since `NodeToControllerChannelManagerImpl` uses 1 for 
`maxInFlightRequestsPerConnection`, we could also add a `sendRequest` api in 
`NodeToControllerChannelManager`, instead 
   
   networkClient.ready(node, now)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1414708109


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -708,23 +708,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
-if (authorizedRequestInfo.isEmpty)
-  sendResponseCallback(Map.empty)
-else {
-  val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
+val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
+val transactionVerificationEntries = new 
ReplicaManager.TransactionVerificationEntries
 
-  // call the replica manager to append messages to the replicas
+def postVerificationCallback(newRequestLocal: RequestLocal)

Review Comment:
   The problem I had was that I needed to pass the errors and the verification 
guards into the postVerificationCallback.
   
   The alternative is to create yet another method in ReplicaManager for the 
transaction produce path. I was hoping to avoid many methods with 7+ arguments 
that look similar but if we really want to move out of Kafka Apis, we can do 
that.
   
   I'm not sure I fully agree that the code was good before. I think it is 
better to keep append records clean and not incorporate transaction 
verification. That's why the transaction verification has the pre-verify (add 
to the txn manager) and post verification (tidy the results and put them in a 
form for the post verification callback).
   
   If we want to move this code out of KafkaApis, I think it only makes sense 
to make a separate method. The other stages need to remain generic for the 
group coordinator usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1414705968


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -284,7 +286,7 @@ ReplicaPlacer replicaPlacer() {
 public void activate() {
 heartbeatManager = new BrokerHeartbeatManager(logContext, time, 
sessionTimeoutNs);
 for (BrokerRegistration registration : brokerRegistrations.values()) {
-heartbeatManager.touch(registration.id(), registration.fenced(), 
-1);
+heartbeatManager.register(registration.id(), 
registration.fenced(), registration.directories());

Review Comment:
   I don't see why we need to handle directories in `HeartbeatManager`. I made 
this comment in the other PR too. HBM is about timing out brokers. It's not 
about JBOD stuff really.
   
   Directories are hard state so they should be kept in 
`ClusterControlManager`. And we need to issue a new `BrokerRegistrationRecord` 
when they change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1414704931


##
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##
@@ -121,8 +122,38 @@ public static Map createAssignmentMap(int[] 
replicas, Uuid[] dire
  * Create an array with the specified number of entries set to {@link 
#UNASSIGNED}.
  */
 public static Uuid[] unassignedArray(int length) {
+return array(length, UNASSIGNED);
+}
+
+/**
+ * Create an array with the specified number of entries set to {@link 
#MIGRATING}.
+ */
+public static Uuid[] migratingArray(int length) {
+return array(length, MIGRATING);
+}
+
+/**
+ * Create an array with the specified number of entries set to the 
specified value.
+ */
+private static Uuid[] array(int length, Uuid value) {
 Uuid[] array = new Uuid[length];
-Arrays.fill(array, UNASSIGNED);
+Arrays.fill(array, value);
 return array;
 }
+
+/**
+ * Check if a directory is online, given a sorted list of online 
directories.
+ * @param dir  The directory to check
+ * @param sortedOnlineDirs The sorted list of online directories
+ * @return true if the directory is considered online, 
false otherwise
+ */
+public static boolean isOnline(Uuid dir, List sortedOnlineDirs) {
+if (UNASSIGNED.equals(dir) || MIGRATING.equals(dir)) {

Review Comment:
   > If you mean zk->kraft migration, then I don't think so. Metadata records 
may have been persisted in an older version, without directory assignment
   
   yes, agreed. @pprovenzano , `MIGRATING` state can happen outside ZK 
migration.
   
   Honestly, I think `MIGRATING` isn't a great name. Someone who sticks on 
metadata version 15 won't ever "migrate" to having directory IDs. Maybe calling 
this `UNKNOWN` would be more descriptive? But that's another discussion I guess 
:)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414704903


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
*/
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.

Review Comment:
   Hmm, if that's the case, should we set communicationInFlight to true when 
sending the very first Registration request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414703024


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This

Review Comment:
   Sorry for so many comments here -- but is it correct we don't have the code 
to add/remove them on commit/abort yet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on PR #14902:
URL: https://github.com/apache/kafka/pull/14902#issuecomment-1839805443

   I don't see why `BrokerHeartbeatManager` needs to know about directories at 
all. Its function is to identify brokers that haven't hearbeated in a while, so 
they can be fenced. But that doesn't intersect with what we care about here 
(directory lists).
   
   Directory lists should be handled by `ClusterControlManager`. They are hard 
state, after all, not soft state.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on code in PR #14902:
URL: https://github.com/apache/kafka/pull/14902#discussion_r1414698623


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -91,8 +102,15 @@ static class BrokerHeartbeatState {
  */
 private BrokerHeartbeatState next;
 
-BrokerHeartbeatState(int id) {
+BrokerHeartbeatState(int id, List directories) {

Review Comment:
   I think we should just require people to pass a list here. It's easy enough 
to pass `Collections.emptyList` if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on code in PR #14902:
URL: https://github.com/apache/kafka/pull/14902#discussion_r1414699020


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -107,6 +125,13 @@ int id() {
 return id;
 }
 
+/**
+ * Returns the sorted directories which the broker currently has 
available
+ */
+List directories() {

Review Comment:
   Seems a bit awkward to have the field named differently than its accessor. 
Can we just call them both `directories` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on code in PR #14902:
URL: https://github.com/apache/kafka/pull/14902#discussion_r1414698623


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -91,8 +102,15 @@ static class BrokerHeartbeatState {
  */
 private BrokerHeartbeatState next;
 
-BrokerHeartbeatState(int id) {
+BrokerHeartbeatState(int id, List directories) {

Review Comment:
   I think we should just require people to pass a list here. It's easy enough 
to pass `Collections.emptyList` if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414697270


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -187,8 +190,6 @@ public CoordinatorResult 
commitOffset(
 short version,
 OffsetCommitRequestData request
 ) {
-snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);

Review Comment:
   What's the context behind removing this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on code in PR #14902:
URL: https://github.com/apache/kafka/pull/14902#discussion_r1414696645


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -1070,11 +1070,27 @@ class ControllerApis(
   }
 
   def handleAssignReplicasToDirs(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
 val assignReplicasToDirsRequest = request.body[AssignReplicasToDirsRequest]
-
-// TODO KAFKA-15426
-requestHelper.sendMaybeThrottle(request,
-  
assignReplicasToDirsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,
+  OptionalLong.empty())
+controller.assignReplicasToDirs(context, 
assignReplicasToDirsRequest.data).handle[Unit] { (reply, e) =>

Review Comment:
   It's not necessary to use `handle` here. You can use `thenApply`. If it 
returns a future which was completed exceptionally, it will be handled in 
`ControllerApis.handle`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414694924


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -675,25 +675,25 @@ public void testValidateOffsetCommit() {
 
 // Simulate a call from the admin client without member id and member 
epoch.
 // This should pass only if the group is empty.
-group.validateOffsetCommit("", "", -1);
+group.validateOffsetCommit("", "", -1, false);

Review Comment:
   Do we want to have a test where this argument is true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414694054


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##
@@ -293,7 +326,11 @@ public void testReplayOffsetCommit() {
 new ApiMessageAndVersion(value, (short) 0)
 ));
 
-verify(offsetMetadataManager, times(2)).replay(key, value);
+verify(offsetMetadataManager, times(2)).replay(
+RecordBatch.NO_PRODUCER_ID,

Review Comment:
   should we verify something with a producer ID here?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##
@@ -293,7 +326,11 @@ public void testReplayOffsetCommit() {
 new ApiMessageAndVersion(value, (short) 0)
 ));
 
-verify(offsetMetadataManager, times(2)).replay(key, value);
+verify(offsetMetadataManager, times(2)).replay(
+RecordBatch.NO_PRODUCER_ID,

Review Comment:
   should we verify something with a producer ID here in this file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-12-04 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1414691022


##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * A callback that will be invoked after registering the changelogs for 
each state store in a standby
+ * task. It is guaranteed to always be invoked before any records are 
loaded into the standby store.
+ *
+ * @param topicPartition the changelog TopicPartition for this standby task
+ * @param storeName the name of the store being loaded
+ * @param startingOffset the offset from which the standby task begins 
consuming from the changelog
+ */
+void onUpdateStart(final TopicPartition topicPartition,
+   final String storeName,
+   final long startingOffset);
+
+/**
+ * Method called after loading a batch of records. In this case the 
maximum size of the batch is whatever
+ * the value of the MAX_POLL_RECORDS is set to.
+ * 
+ * This method is called after loading each batch and it is advised to 
keep processing to a minimum.
+ * Any heavy processing will block the state updater thread and slow down 
the rate of standby task 
+ * loading. Therefore, if you need to do any extended processing or 
connect to an external service,
+ * consider doing so asynchronously.
+ *
+ * @param topicPartition the TopicPartition containing the values to 
restore
+ * @param storeName the name of the store undergoing restoration
+ * @param batchEndOffset batchEndOffset the changelog end offset 
(inclusive) of the batch that was just loaded
+ * @param batchSize the total number of records in the batch that was just 
loaded
+ * @param currentEndOffset the current end offset of the changelog topic 
partition.
+ */
+void onBatchLoaded(final TopicPartition topicPartition,
+   final String storeName,
+   final TaskId taskId,
+   final long batchEndOffset,
+   final long batchSize,
+   final long currentEndOffset);
+
+/**
+ * This method is called when the corresponding standby task stops 
updating, for the provided reason.
+ * 
+ * If the task was {@code MIGRATED} to another instance, this callback 
will be invoked after this
+ * state store (and the task itself) are closed (in which case the data 
will be cleaned up after 
+ * state.cleanup.delay.ms).
+ * If the task was {@code PROMOTED} to an active task, the state store 
will not be closed, and the 
+ * callback will be invoked after unregistering it as a standby task but 
before re-registering it as an active task 
+ * and beginning restoration. In other words, this will always called 
before the corresponding 
+ * {@link StateRestoreListener#onRestoreStart} call is made.
+ *
+ * @param topicPartition the TopicPartition containing the values to 
restore
+ * @param storeName the name of the store undergoing restoration

Review Comment:
   ```suggestion
* @param topicPartition the changelog TopicPartition for this standby 
task
* @param storeName the name of the store being loaded
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable 

Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414690648


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -1615,4 +1618,127 @@ public void testDeleteGroupsWhenNotStarted() throws 
ExecutionException, Interrup
 future.get()
 );
 }
+
+@Test
+public void testCommitTransactionalOffsetsWhenNotStarted() throws 
ExecutionException, InterruptedException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime,
+new GroupCoordinatorMetrics()
+);
+
+TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+.setGroupId("foo")
+.setTransactionalId("transactional-id")
+.setMemberId("member-id")
+.setGenerationId(10)
+.setTopics(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+.setName("topic")
+.setPartitions(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+.setPartitionIndex(0)
+.setCommittedOffset(100);
+
+CompletableFuture future = 
service.commitTransactionalOffsets(
+requestContext(ApiKeys.TXN_OFFSET_COMMIT),
+request,
+BufferSupplier.NO_CACHING
+);
+
+assertEquals(
+new TxnOffsetCommitResponseData()
+.setTopics(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+.setName("topic")
+.setPartitions(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+.setPartitionIndex(0)
+
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+future.get()
+);
+}
+
+@ParameterizedTest
+@NullSource

Review Comment:
   Does this mean we test null string and empty string?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414690195


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -833,13 +835,15 @@ public void validateOffsetCommit(
 if (generationId != this.generationId) {
 throw Errors.ILLEGAL_GENERATION.exception();
 }
-} else if (!isInState(EMPTY)) {
+} else if (!isTransactional && !isInState(EMPTY)) {
 // If the request does not contain the member id and the 
generation id (version 0),
 // offset commits are only accepted when the group is empty.
+// This does not apply to transactional offset commits, since the 
older versions
+// of this protocol do not require member id and generation id.
 throw Errors.UNKNOWN_MEMBER_ID.exception();
 }
 
-if (isInState(COMPLETING_REBALANCE)) {
+if (!isTransactional && isInState(COMPLETING_REBALANCE)) {

Review Comment:
   Does the comment above also apply to this case with respect to transactional 
offset commits?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-12-04 Thread via GitHub


mjsax commented on code in PR #14714:
URL: https://github.com/apache/kafka/pull/14714#discussion_r1414683411


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java:
##
@@ -97,6 +98,17 @@ private int getCost(final TaskId taskId, final UUID 
processId, final boolean inC
 return 1;
 }
 
+@Test
+public void testSubtopicShouldContainAllTasks() {

Review Comment:
   `Subtopology` instead of `Subtopic` ?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java:
##
@@ -88,55 +93,20 @@ public Graph constructTaskGraph(
 }
 }
 
-// TODO: validate tasks in tasksForTopicGroup and taskIdList
-final SortedMap> sortedTasksForTopicGroup = 
new TreeMap<>(tasksForTopicGroup);

Review Comment:
   Was this code only moved, or altered?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -676,18 +676,28 @@ static SortedMap> 
getTaskTopicPartitionMap(final int
 return taskTopicPartitionMap;
 }
 
-static Map configProps(final boolean 
enableRackAwareAssignor) {
-return configProps(enableRackAwareAssignor, 0);
+static Map> getTasksForTopicGroup(final int 
tpSize, final int partitionSize) {
+final Map> tasksForTopicGroup = new 
HashMap<>();
+for (int i = 0; i < tpSize; i++) {
+for (int j = 0; j < partitionSize; j++) {
+final Subtopology subtopology = new Subtopology(i, null);
+tasksForTopicGroup.computeIfAbsent(subtopology, k -> new 
HashSet<>()).add(new TaskId(i, j));
+}
+}
+return tasksForTopicGroup;
+}
+
+static Map configProps(final String rackAwareConfig) {
+return configProps(rackAwareConfig, 0);
 }
 
-static Map configProps(final boolean 
enableRackAwareAssignor, final int replicaNum) {
+static Map configProps(final String rackAwareConfig, final 
int replicaNum) {
 final Map configurationMap = new HashMap<>();
 configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
APPLICATION_ID);
 configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
USER_END_POINT);
 configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
replicaNum);
-if (enableRackAwareAssignor) {
-
configurationMap.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC);
-}
+// 
configurationMap.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC);

Review Comment:
   Needs some cleanup



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java:
##
@@ -394,6 +394,106 @@ public void testDeterministic() {
 }
 }
 
+@Test
+public void testMaxFlowOnlySourceAndSink() {
+final Graph graph1 = new Graph<>();

Review Comment:
   Nit: Why `graph1` but not `graph`? (similar below)



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptySet;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTasksForTopicGroup;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.ArrayList;
+import 

Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]

2023-12-04 Thread via GitHub


junrao commented on PR #14811:
URL: https://github.com/apache/kafka/pull/14811#issuecomment-1839774186

   Just merged a related PR https://github.com/apache/kafka/pull/14767. 
Re-triggering the tests to make sure there are no new issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]

2023-12-04 Thread via GitHub


junrao closed pull request #14811: KAFKA-15831: KIP-1000 protocol and admin 
client
URL: https://github.com/apache/kafka/pull/14811


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15784: Ensure atomicity of in memory update and write when transactionally committing offsets [kafka]

2023-12-04 Thread via GitHub


artemlivshits commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r141457


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -708,23 +708,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
-if (authorizedRequestInfo.isEmpty)
-  sendResponseCallback(Map.empty)
-else {
-  val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
+val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
+val transactionVerificationEntries = new 
ReplicaManager.TransactionVerificationEntries
 
-  // call the replica manager to append messages to the replicas
+def postVerificationCallback(newRequestLocal: RequestLocal)

Review Comment:
   Do we need to change this file?  I think we could preserve the abstraction 
and keep the verification guts encapsulated in the async call (as it is now), 
and only let the group coordinator use the explicit stages.
   
   I.e. replicaManager.appendRecords would do what it does now (under the 
covers call the stages), then we don't have to bring the complexity into the 
code that works well with the abstraction.



##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -349,146 +468,68 @@ class GroupMetadataManager(brokerId: Int,
consumerId: String,
offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors] 
=> Unit,
-   transactionalId: String = null,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
-// first filter out partitions with offset metadata size exceeding limit
-val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
-  validateOffsetMetadataLength(offsetAndMetadata.metadata)
-}
-
 group.inLock {
   if (!group.hasReceivedConsistentOffsetCommits)
 warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has 
received offset commits from consumers as well " +
   s"as transactional producers. Mixing both types of offset commits 
will generally result in surprises and " +
   s"should be avoided.")
 }
 
-val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
-// construct the message set to append
+val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
+  validateOffsetMetadataLength(offsetAndMetadata.metadata)
+}
 if (filteredOffsetMetadata.isEmpty) {
   // compute the final error codes for the commit response
   val commitStatus = offsetMetadata.map { case (k, _) => k -> 
Errors.OFFSET_METADATA_TOO_LARGE }
   responseCallback(commitStatus)
-} else {
-  getMagic(partitionFor(group.groupId)) match {
-case Some(magicValue) =>
-  // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
-  val timestampType = TimestampType.CREATE_TIME
-  val timestamp = time.milliseconds()
-
-  val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-val key = GroupMetadataManager.offsetCommitKey(group.groupId, 
topicIdPartition.topicPartition)
-val value = 
GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
-new SimpleRecord(timestamp, key, value)
-  }
-  val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
-  val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
-
-  if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
-throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting 
to make a transaction offset commit with an invalid magic: " + magicValue)
-
-  val builder = MemoryRecords.builder(buffer, magicValue, 
compressionType, timestampType, 0L, time.milliseconds(),
-producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
-
-  records.foreach(builder.append)
-  val entries = Map(offsetTopicPartition -> builder.build())
-
-  // set the callback function to insert offsets into cache after log 
append completed
-  def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
-// the append response should only contain the topics partition
-if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
-  throw new IllegalStateException("Append status %s 

Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]

2023-12-04 Thread via GitHub


junrao merged PR #14767:
URL: https://github.com/apache/kafka/pull/14767


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839768073

   > Instead, CommunicationEvent.run() should gracefully handle running when 
communicationInFlight = true Probably by doing nothing.
   
   This also implies that `scheduleNextCommunicationAfterSuccess` should be 
checking `nextSchedulingShouldBeImmediate` (not sure why it doesn't already)
   
   Probably `nextSchedulingShouldBeImmediate` should be renamed to something 
like "dirty" since it doesn't actually mean the next scheduling should always 
be immediate. Like if there has been a communication error 
(`scheduleNextCommunicationAfterFailure`) we do not want to immediately 
reschedule, no matter what.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]

2023-12-04 Thread via GitHub


junrao commented on PR #14767:
URL: https://github.com/apache/kafka/pull/14767#issuecomment-1839767052

   @apoorvmittal10 : Thanks for triaging the tests. Merging the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-04 Thread via GitHub


mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1414568376


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+protected final ListIterator segmentIterator;
+private final Bytes key;
+private final Long fromTime;
+private final Long toTime;
+private final ResultOrder order;
+private ListIterator> iterator;
+private volatile boolean open = true;
+
+// defined for creating/releasing the snapshot. 
+private LogicalKeyValueSegment snapshotOwner = null;
+private Snapshot snapshot = null;
+
+
+
+public LogicalSegmentIterator(final ListIterator 
segmentIterator,
+  final Bytes key,
+  final Long fromTime,
+  final Long toTime,
+  final ResultOrder order) {
+
+this.segmentIterator = segmentIterator;
+this.key = key;
+this.fromTime = fromTime;
+this.toTime = toTime;
+this.iterator = Collections.emptyListIterator();
+this.order = order;
+}
+
+@Override
+public void close() {
+open = false;
+// user may refuse consuming all returned records, so release the 
snapshot when closing the iterator if it is not released yet!
+releaseSnapshot();
+}
+
+@Override
+public boolean hasNext() {
+if (!open) {
+throw new IllegalStateException("The iterator is out of scope.");
+}
+final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();

Review Comment:
   We might was to add a comment about why `ASCENDING` is using `hasPrevious()` 
-- it's not intuitive and only makes sense if one knows that data is storing 
descending inside a segement.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -263,6 +268,31 @@ public VersionedRecord get(final Bytes key, final 
long asOfTimestamp) {
 return null;
 }
 
+@SuppressWarnings("unchecked")
+VersionedRecordIterator get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final ResultOrder order) {
+validateStoreOpen();
+
+if (toTimestamp < observedStreamTime - historyRetention) {
+// history retention exceeded. we still check the latest value 
store in case the
+// latest record version satisfies the timestamp bound, in which 
case it should
+// still be returned (i.e., the latest record version per key 
never expires).
+return new 
LogicalSegmentIterator(Collections.singletonList(latestValueStore).listIterator(),
 key, fromTimestamp, toTimestamp, order);
+} else {
+final List segments = new ArrayList<>();
+// add segment stores
+// consider the search lower bound as -INF (LONG.MIN_VALUE) to 
find the record that has been inserted before the {@code fromTimestamp}
+// but is still valid in query specified time interval.
+if (order.equals(ResultOrder.ASCENDING)) {
+segments.addAll(segmentStores.segments(Long.MIN_VALUE, 
toTimestamp, true));

Review Comment:
   Given we pass in `MIN_VALUE` twice, could we omit the first parameter?



##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##
@@ -153,14 +192,189 @@ private void shouldHandleVersionedKeyQuery(final Integer 
key,
  

Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839747217

   Thanks for looking at this, @soarez .
   
   I don't think `nextSchedulingShouldBeImmediate` should be reset to false 
until the `CommunicationEvent` is run.
   
   I also think we should be a bit more careful about setting 
`communicationInFlight` ... it should only be set after 
`_channelManager.sendRequest` has been called. And even that call should 
probably be wrapped in a `try...catch`. If `sendRequest` throws, we don't want 
to be stuck never sending another request.
   
   (I don't think sendRequest is supposed to throw, but we should be careful.)
   
   I don't think prepend is needed anywhere. To be honest, the presence of 
`prepend` usually indicates a bug... :) Instead, `CommunicationEvent.run()` 
should gracefully handle running when `communicationInFlight = true` Probably 
by doing nothing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414647339


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
*/
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.

Review Comment:
   It applies to both Heartbeat and Registration requests which are the only 
requests this manager sends.
   
   I think we're also fixing a bug were a second registration request could 
have been scheduled after a call to `propagateDirectoryFailure` and before the 
registration response is received.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414633539


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -453,79 +490,73 @@ class BrokerLifecycleManager(
 val message = 
response.responseBody().asInstanceOf[BrokerHeartbeatResponse]
 val errorCode = Errors.forCode(message.data().errorCode())
 if (errorCode == Errors.NONE) {
-  // this response handler is not invoked from the event handler 
thread,
-  // and processing a successful heartbeat response requires updating
-  // state, so to continue we need to schedule an event
-  eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data()))
+  val responseData = message.data()
+  failedAttempts = 0
+  _state match {
+case BrokerState.STARTING =>
+  if (responseData.isCaughtUp) {
+info(s"The broker has caught up. Transitioning from STARTING 
to RECOVERY.")
+_state = BrokerState.RECOVERY
+initialCatchUpFuture.complete(null)
+  } else {
+debug(s"The broker is STARTING. Still waiting to catch up with 
cluster metadata.")
+  }
+  // Schedule the heartbeat after only 10 ms so that in the case 
where
+  // there is no recovery work to be done, we start up a bit 
quicker.
+  scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
+case BrokerState.RECOVERY =>
+  if (!responseData.isFenced) {
+info(s"The broker has been unfenced. Transitioning from 
RECOVERY to RUNNING.")
+initialUnfenceFuture.complete(null)
+_state = BrokerState.RUNNING
+  } else {
+info(s"The broker is in RECOVERY.")
+  }
+  scheduleNextCommunicationAfterSuccess()
+case BrokerState.RUNNING =>
+  debug(s"The broker is RUNNING. Processing heartbeat response.")
+  scheduleNextCommunicationAfterSuccess()
+case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
+  if (!responseData.shouldShutDown()) {
+info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, 
still waiting " +
+  "for the active controller.")
+if (!gotControlledShutdownResponse) {
+  // If this is the first pending controlled shutdown response 
we got,
+  // schedule our next heartbeat a little bit sooner than we 
usually would.
+  // In the case where controlled shutdown completes quickly, 
this will
+  // speed things up a little bit.
+  scheduleNextCommunication(NANOSECONDS.convert(50, 
MILLISECONDS))
+} else {
+  scheduleNextCommunicationAfterSuccess()
+}
+  } else {
+info(s"The controller has asked us to exit controlled 
shutdown.")
+beginShutdown()
+  }
+  gotControlledShutdownResponse = true
+case BrokerState.SHUTTING_DOWN =>
+  info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat 
response.")
+case _ =>
+  error(s"Unexpected broker state ${_state}")
+  scheduleNextCommunicationAfterSuccess()
+  }
 } else {
   warn(s"Broker $nodeId sent a heartbeat request but received error 
$errorCode.")
   scheduleNextCommunicationAfterFailure()
 }
   }
 }
-
-override def onTimeout(): Unit = {
-  info("Unable to send a heartbeat because the RPC got timed out before it 
could be sent.")
-  scheduleNextCommunicationAfterFailure()
-}
   }
 
-  private class BrokerHeartbeatResponseEvent(response: 
BrokerHeartbeatResponseData) extends EventQueue.Event {
-override def run(): Unit = {
-  failedAttempts = 0
-  _state match {
-case BrokerState.STARTING =>
-  if (response.isCaughtUp) {
-info(s"The broker has caught up. Transitioning from STARTING to 
RECOVERY.")
-_state = BrokerState.RECOVERY
-initialCatchUpFuture.complete(null)
-  } else {
-debug(s"The broker is STARTING. Still waiting to catch up with 
cluster metadata.")
-  }
-  // Schedule the heartbeat after only 10 ms so that in the case where
-  // there is no recovery work to be done, we start up a bit quicker.
-  scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
-case BrokerState.RECOVERY =>
-  if (!response.isFenced) {
-info(s"The broker has been unfenced. Transitioning from RECOVERY 
to RUNNING.")
-initialUnfenceFuture.complete(null)
-_state = BrokerState.RUNNING
-  } else {
-info(s"The broker is in RECOVERY.")
-  }

Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414627867


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This

Review Comment:
   This comment also confused me because we also use the same nested 3 data 
structures to store the committed offsets it seems. So we have 
   
   offsetsByGroup -> all committed offsets where we don't care about producer ID
   
   pendingTransactionalOffsets -> a mapping of producer id -> offsetsByGroup 
(for only that producer) for all uncommitted txn offsets
   
   On abort we remove the producer id's entries, and on commit we add them to 
offsetsByGroup. 
   
   Is this correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-04 Thread via GitHub


kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1414627626


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1376,6 +1425,66 @@ private void subscribeInternal(Collection 
topics, Optional
+ * Process background events, if any
+ * Briefly wait for {@link CompletableApplicationEvent an 
event} to complete
+ * 
+ *
+ * 
+ *
+ * Each iteration gives the application thread an opportunity to process 
background events, which may be
+ * necessary to complete the overall processing.
+ *
+ * 
+ *
+ * As an example, take {@link #unsubscribe()}. To start unsubscribing, the 
application thread enqueues an
+ * {@link UnsubscribeApplicationEvent} on the application event queue. 
That event will eventually trigger the
+ * rebalancing logic in the background thread. Critically, as part of this 
rebalancing work, the
+ * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} 
callback needs to be invoked. However,
+ * this callback must be executed on the application thread. To achieve 
this, the background thread enqueues a
+ * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background 
event queue. That event queue is
+ * periodically queried by the application thread to see if there's work 
to be done. When the application thread
+ * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is 
processed, and then a
+ * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then 
enqueued by the application thread on the
+ * background event queue. Moments later, the background thread will see 
that event, process it, and continue
+ * execution of the rebalancing logic. The rebalancing logic cannot 
complete until the
+ * {@link ConsumerRebalanceListener} callback is performed.
+ *
+ * @param event Event that contains a {@link CompletableFuture}; it is on 
this future that the application thread
+ *  will wait for completion
+ * @param timer Overall timer that bounds how long the application thread 
will wait for the event to complete
+ * @return {@code true} if the event completed within the timeout, {@code 
false} otherwise
+ */
+private boolean processBackgroundEvents(CompletableApplicationEvent 
event, Timer timer) {
+log.trace("Enqueuing event {} for processing; will wait up to {} ms to 
complete", event, timer.remainingMs());
+
+do {
+backgroundEventProcessor.process();
+
+try {
+Timer pollInterval = time.timer(100L);
+log.trace("Waiting {} ms for event {} to complete", event, 
pollInterval.remainingMs());
+ConsumerUtils.getResult(event.future(), pollInterval);
+log.trace("Event {} completed successfully", event);
+return true;
+} catch (TimeoutException e) {
+// Ignore this as we will retry the event until the timeout 
expires.
+} finally {
+timer.update(time.milliseconds());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414626881


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -366,8 +379,27 @@ class BrokerLifecycleManager(
   new BrokerRegistrationResponseHandler())
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
+}
+
+override def onTimeout(): Unit = {
+  info(s"Unable to register the broker because the RPC got timed out 
before it could be sent.")
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
+}
+  }
+
+  private class BrokerRegistrationResponseEvent(response: ClientResponse, 
timedOut: Boolean) extends EventQueue.Event {
+override def run(): Unit = {
+  communicationInFlight = false

Review Comment:
   I don't think so. We set `communicationInFlight = true` in 
`CommunicationEvent.run`, which sends both heartbeats but also registration 
requests.
   
   Maybe I'm missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-04 Thread via GitHub


kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1414626549


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerMethodName.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+/**
+ * This class just provides a static name for the methods in the {@link 
ConsumerRebalanceListener} interface
+ * for a bit more compile time assurance.
+ */
+public enum ConsumerRebalanceListenerMethodName {
+
+onPartitionsRevoked, onPartitionsAssigned, onPartitionsLost;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-04 Thread via GitHub


kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1414625883


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -650,7 +693,7 @@ boolean reconcile() {
 "\tCurrent owned partitions:  {}\n" +
 "\tAdded partitions (assigned - owned):   {}\n" +
 "\tRevoked partitions (owned - assigned): {}\n",
-assignedTopicIdPartitions,
+assignedTopicPartitions,

Review Comment:
   I've reverted the change to use `assignedTopicPartitions` instead of 
`assignedTopicIdPartitions`. We can take that up later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414621182


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This
+ * structure holds all the transactional offsets that are part of ongoing 
transactions.
+ * When the transaction is committed, they are transferred to the 
offsetsByGroup; when
+ * the transaction is aborted, they are removed.
+ */
+private final TimelineHashMap pendingTransactionalOffsets;
+
+private class Offsets {
+private final TimelineHashMap>> offsetsByGroup;
+
+private Offsets() {
+this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+
+private OffsetAndMetadata get(
+String groupId,
+String topic,
+int partition
+) {
+TimelineHashMap> topicOffsets = offsetsByGroup.get(groupId);
+if (topicOffsets == null) {
+return null;
+} else {
+TimelineHashMap partitionOffsets = 
topicOffsets.get(topic);
+if (partitionOffsets == null) {
+return null;
+} else {
+return partitionOffsets.get(partition);
+}
+}
+}
+
+private OffsetAndMetadata put(
+String groupId,
+String topic,
+int partition,
+OffsetAndMetadata offsetAndMetadata
+) {
+TimelineHashMap> topicOffsets = offsetsByGroup
+.computeIfAbsent(groupId, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
+TimelineHashMap partitionOffsets = 
topicOffsets
+.computeIfAbsent(topic, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
+return partitionOffsets.put(partition, offsetAndMetadata);
+}
+
+private OffsetAndMetadata remove(
+String groupId,
+String topic,
+int partition
+) {
+TimelineHashMap> topicOffsets = offsetsByGroup.get(groupId);
+if (topicOffsets == null)
+return null;
+
+TimelineHashMap partitionOffsets = 
topicOffsets.get(topic);
+if (partitionOffsets == null)
+return null;
+
+OffsetAndMetadata removedValue = 
partitionOffsets.remove(partition);
+
+if (partitionOffsets.isEmpty())

Review Comment:
   Was about to ask if we remove the nested maps, but now I see we do so here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-6968: Adds calls to listener on rebalance of MockConsumer [kafka]

2023-12-04 Thread via GitHub


efgpinto closed pull request #7539: KAFKA-6968: Adds calls to listener on 
rebalance of MockConsumer
URL: https://github.com/apache/kafka/pull/7539


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-6968) Call RebalanceListener in MockConsumer

2023-12-04 Thread Eduardo Pinto (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Pinto reassigned KAFKA-6968:


Assignee: (was: Eduardo Pinto)

> Call RebalanceListener in MockConsumer
> --
>
> Key: KAFKA-6968
> URL: https://issues.apache.org/jira/browse/KAFKA-6968
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 1.1.0
>Reporter: Andras Beni
>Priority: Minor
>
> {{org.apache.kafka.clients.consumer.MockConsumer}} simulates rebalance with 
> method {{public synchronized void rebalance(Collection 
> newAssignment)}}. This method does not call {{ConsumerRebalanceListener}} 
> methods. Calls to {{onPartitionsRevoked(...)}} and 
> {{onPartitionsAssigned(...)}} should be added in appropriate order.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-6968) Call RebalanceListener in MockConsumer

2023-12-04 Thread Eduardo Pinto (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793049#comment-17793049
 ] 

Eduardo Pinto commented on KAFKA-6968:
--

Unassigning myself due to lack of bandwidth to conclude this. Not sure if this 
is still relevant anyway.

> Call RebalanceListener in MockConsumer
> --
>
> Key: KAFKA-6968
> URL: https://issues.apache.org/jira/browse/KAFKA-6968
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 1.1.0
>Reporter: Andras Beni
>Priority: Minor
>
> {{org.apache.kafka.clients.consumer.MockConsumer}} simulates rebalance with 
> method {{public synchronized void rebalance(Collection 
> newAssignment)}}. This method does not call {{ConsumerRebalanceListener}} 
> methods. Calls to {{onPartitionsRevoked(...)}} and 
> {{onPartitionsAssigned(...)}} should be added in appropriate order.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414610034


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This

Review Comment:
   This comment about the keys is a little confusing.
   
   Looking at the code we have 4 maps that are nested within each other.  
Maybe the comment could mention that each of these is a separate map? (Or we 
could include a non java doc comment in offsets to make it clearer.
   
   



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This

Review Comment:
   This comment about the keys is a little confusing.
   
   Looking at the code we have 4 maps that are nested within each other.  
Maybe the comment could mention that each of these is a separate map? (Or we 
could include a non java doc comment in offsets to make it clearer.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9631: Fix support for optional fields in MockAdminClient [kafka]

2023-12-04 Thread via GitHub


efgpinto closed pull request #8430: KAFKA-9631: Fix support for optional fields 
in MockAdminClient
URL: https://github.com/apache/kafka/pull/8430


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9631: Fix support for optional fields in MockAdminClient [kafka]

2023-12-04 Thread via GitHub


efgpinto commented on PR #8430:
URL: https://github.com/apache/kafka/pull/8430#issuecomment-1839682362

   No bandwidth to revisit this. Closing. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9631) MockAdminClient doesn't handle CreateTopics optional fields

2023-12-04 Thread Eduardo Pinto (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793048#comment-17793048
 ] 

Eduardo Pinto commented on KAFKA-9631:
--

I don't have the capacity to work on this and don't even know if this is still 
relevant so I've unsigned myself and will close the PR.

> MockAdminClient doesn't handle CreateTopics optional fields
> ---
>
> Key: KAFKA-9631
> URL: https://issues.apache.org/jira/browse/KAFKA-9631
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Priority: Major
>
> AdminClient's {{createTopics()}} method has a variant with two optional 
> fields. So I'd expect the following code to work correctly:
>  {{admin.createTopics(Collections.singletonList(new NewTopic(TOPIC_NAME, 
> Optional.empty(), Optional.empty(}}
> Indeed it works great, as long as we are using the real KafkaAdminClient. 
> MockKafkaAdminClient tries to get number of replicas without checking that 
> the values make sense , and therefore it fails with:
> {{java.lang.IllegalArgumentException: Illegal Capacity: -1}}{{at 
> java.base/java.util.ArrayList.(ArrayList.java:158)}}
> {{ at 
> org.apache.kafka.clients.admin.MockAdminClient.createTopics(MockAdminClient.java:183)}}
> {{ at org.apache.kafka.clients.admin.Admin.createTopics(Admin.java:125)}}
> Making a mockery of the mock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-9631) MockAdminClient doesn't handle CreateTopics optional fields

2023-12-04 Thread Eduardo Pinto (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Pinto reassigned KAFKA-9631:


Assignee: (was: Eduardo Pinto)

> MockAdminClient doesn't handle CreateTopics optional fields
> ---
>
> Key: KAFKA-9631
> URL: https://issues.apache.org/jira/browse/KAFKA-9631
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Priority: Major
>
> AdminClient's {{createTopics()}} method has a variant with two optional 
> fields. So I'd expect the following code to work correctly:
>  {{admin.createTopics(Collections.singletonList(new NewTopic(TOPIC_NAME, 
> Optional.empty(), Optional.empty(}}
> Indeed it works great, as long as we are using the real KafkaAdminClient. 
> MockKafkaAdminClient tries to get number of replicas without checking that 
> the values make sense , and therefore it fails with:
> {{java.lang.IllegalArgumentException: Illegal Capacity: -1}}{{at 
> java.base/java.util.ArrayList.(ArrayList.java:158)}}
> {{ at 
> org.apache.kafka.clients.admin.MockAdminClient.createTopics(MockAdminClient.java:183)}}
> {{ at org.apache.kafka.clients.admin.Admin.createTopics(Admin.java:125)}}
> Making a mockery of the mock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]

2023-12-04 Thread via GitHub


gharris1727 commented on code in PR #14293:
URL: https://github.com/apache/kafka/pull/14293#discussion_r1414605900


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -314,6 +356,23 @@ private  void 
awaitConnectorTasksStart(final MirrorMa
 }, MM_START_UP_TIMEOUT_MS, "Tasks for connector " + 
clazz.getSimpleName() + " for MirrorMaker instances did not transition to 
running in time");
 }
 
+private  void 
awaitConnectorConfiguration(MirrorMaker mm, Class clazz, SourceAndTarget 
sourceAndTarget, Predicate> predicate) throws 
InterruptedException {
+String connName = clazz.getSimpleName();
+waitForCondition(() -> {
+try {
+FutureCallback>> cb = 
new FutureCallback<>();
+herder(mm, sourceAndTarget).tasksConfig(connName, cb);

Review Comment:
   I think I was doing a sort of "end-to-end-test" that verified that both the 
new configuration got applied to the connector, and that internal forwarding 
took place for the task configurations. I realize now that the method name 
doesn't match this implementation, so i renamed it to match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix bug where we weren't registering SnapshotEmitterMetrics [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on PR #14918:
URL: https://github.com/apache/kafka/pull/14918#issuecomment-1839616178

   > Thanks @cmccabe. Is the impact of this bug that we weren't reporting the 
two metrics LatestSnapshotGeneratedBytes and LatestSnapshotGeneratedAgeMs?
   
   Unfortunately, yes that was the impact.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-12-04 Thread via GitHub


gharris1727 commented on code in PR #14562:
URL: https://github.com/apache/kafka/pull/14562#discussion_r1414514216


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -394,9 +399,16 @@ public void validateConnectorConfig(Map 
connectorProps, Callback
 
 @Override
 public void validateConnectorConfig(Map connectorProps, 
Callback callback, boolean doLog) {
+callback.recordStage(new Stage(
+"waiting for a new thread to become available for connector 
validation",
+time.milliseconds()
+));
 connectorExecutor.submit(() -> {
+callback.recordStage(null);

Review Comment:
   Could this instead be a call to Stage::complete, or a use for TemporaryStage?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -771,6 +778,104 @@ private Map 
defaultSinkConnectorProps(String topics) {
 return props;
 }
 
+@Test
+public void testRequestTimeouts() throws Exception {
+final String configTopic = "test-request-timeout-configs";
+workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+// Workaround for KAFKA-15676, which can cause the scheduled rebalance 
delay to
+// be spuriously triggered after the group coordinator for a Connect 
cluster is bounced
+// Set to 1 instead of 0 as another workaround for KAFKA-15693, which 
can cause
+// connectors and tasks to be unassigned indefinitely if the scheduled 
rebalance delay
+// is set to 0
+workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1");
+connect = connectBuilder
+.numBrokers(1)
+.numWorkers(1)
+.build();
+connect.start();
+connect.assertions().assertAtLeastNumWorkersAreUp(1,
+"Worker did not start in time");
+
+Map connectorConfig1 = 
defaultSourceConnectorProps(TOPIC_NAME);
+Map connectorConfig2 = new HashMap<>(connectorConfig1);
+connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 
1));
+
+// Create a connector to ensure that the worker has completed startup
+log.info("Creating initial connector");
+connect.configureConnector(CONNECTOR_NAME, connectorConfig1);
+connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start 
in time"
+);
+
+// Bring down Kafka, which should cause some REST requests to fail
+log.info("Stopping Kafka cluster");
+connect.kafka().stopOnlyKafka();
+// Allow for the workers to discover that the coordinator is 
unavailable, wait is
+// heartbeat timeout * 2 + 4sec
+Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+connect.requestTimeout(5_000);
+// Try to reconfigure the connector, which should fail with a timeout 
error
+log.info("Trying to reconfigure connector while Kafka cluster is 
down");
+ConnectRestException e = assertThrows(
+ConnectRestException.class,
+() -> connect.configureConnector(CONNECTOR_NAME, 
connectorConfig2)
+);
+assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode());
+assertNotNull(e.getMessage());
+assertTrue(
+"Message '" + e.getMessage() + "' does not match expected 
format",
+e.getMessage().contains("Request timed out. The worker is 
currently flushing updates to the status topic")

Review Comment:
   Is this the only error message possible when shutting down kafka, or the 
most common?
   I would expect that an error about ensuring membership in the cluster could 
appear.



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/Stage.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Stage {
+private final String description;
+private final long started;
+private final 

Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]

2023-12-04 Thread via GitHub


C0urante commented on code in PR #14293:
URL: https://github.com/apache/kafka/pull/14293#discussion_r1414541177


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.utils.Time;
+import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES;
+
+public class MirrorHerder extends DistributedHerder {
+
+private static final Logger log = 
LoggerFactory.getLogger(MirrorHerder.class);
+
+private final MirrorMakerConfig config;
+private final SourceAndTarget sourceAndTarget;
+private boolean wasLeader;
+
+public MirrorHerder(MirrorMakerConfig mirrorConfig, SourceAndTarget 
sourceAndTarget, DistributedConfig config, Time time, Worker worker, String 
kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore 
configBackingStore, String restUrl, RestClient restClient, 
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, 
List restNamespace, AutoCloseable... uponShutdown) {
+super(config, time, worker, kafkaClusterId, statusBackingStore, 
configBackingStore, restUrl, restClient, connectorClientConfigOverridePolicy, 
restNamespace, uponShutdown);
+this.config = mirrorConfig;
+this.sourceAndTarget = sourceAndTarget;
+}
+
+@Override
+protected boolean handleRebalanceCompleted() {
+if (!super.handleRebalanceCompleted()) {
+return false;
+}
+if (isLeader()) {
+if (!wasLeader) {
+log.info("This node {} is now a leader for {}. Configuring 
connectors...", this, sourceAndTarget);
+configureConnectors();
+}
+wasLeader = true;
+} else {
+wasLeader = false;
+}
+return true;
+}
+
+private void configureConnectors() {
+CONNECTOR_CLASSES.forEach(this::maybeConfigureConnector);
+}
+
+private void maybeConfigureConnector(Class connectorClass) {
+Map connectorProps = 
config.connectorBaseConfig(sourceAndTarget, connectorClass);
+connectorConfig(connectorClass.getSimpleName(), (e, existingConfig) -> 
{

Review Comment:
   Do we need/want this to execute asynchronously? At first glance it seems 
like it'd be cleaner to read the connector config from the [superclass's 
configState 
field](https://github.com/apache/kafka/blob/a83bc2d977d2af85d4edfc8096854137481001e9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L193),
 which is already `protected` (though for somewhat dubious reasons...).



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHerder.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the 

[PR] KAFKA-15022: tests for HA assignor [kafka]

2023-12-04 Thread via GitHub


lihaosky opened a new pull request, #14921:
URL: https://github.com/apache/kafka/pull/14921

   Tests for HAAssignor and StickyAssignor.
   
   This PR is on top of https://github.com/apache/kafka/pull/14714
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]

2023-12-04 Thread via GitHub


dajac commented on code in PR #14885:
URL: https://github.com/apache/kafka/pull/14885#discussion_r1414553902


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -126,57 +129,63 @@ class CoordinatorPartitionWriter[T](
 val magic = logConfig.recordVersion.value
 val maxBatchSize = logConfig.maxMessageSize
 val currentTimeMs = time.milliseconds()
-
-val recordsBuilder = MemoryRecords.builder(
-  ByteBuffer.allocate(math.min(16384, maxBatchSize)),
-  magic,
-  compressionType,
-  TimestampType.CREATE_TIME,
-  0L,
-  time.milliseconds(),
-  producerId,
-  producerEpoch,
-  0,
-  producerId != RecordBatch.NO_PRODUCER_ID,
-  RecordBatch.NO_PARTITION_LEADER_EPOCH
-)
-
-records.forEach { record =>
-  val keyBytes = serializer.serializeKey(record)
-  val valueBytes = serializer.serializeValue(record)
-
-  if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, 
EMPTY_HEADERS)) recordsBuilder.append(
-currentTimeMs,
-keyBytes,
-valueBytes,
-EMPTY_HEADERS
-  ) else throw new RecordTooLargeException(s"Message batch size is 
${recordsBuilder.estimatedSizeInBytes()} bytes " +
-s"in append to partition $tp which exceeds the maximum configured 
size of $maxBatchSize.")
+val bufferSupplier = threadLocalBufferSupplier.get()
+val buffer = bufferSupplier.get(math.min(16384, maxBatchSize))

Review Comment:
   That’s right. I actually took it from here: 
https://github.com/apache/kafka/blob/a83bc2d977d2af85d4edfc8096854137481001e9/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L177



##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -126,57 +129,63 @@ class CoordinatorPartitionWriter[T](
 val magic = logConfig.recordVersion.value
 val maxBatchSize = logConfig.maxMessageSize
 val currentTimeMs = time.milliseconds()
-
-val recordsBuilder = MemoryRecords.builder(
-  ByteBuffer.allocate(math.min(16384, maxBatchSize)),
-  magic,
-  compressionType,
-  TimestampType.CREATE_TIME,
-  0L,
-  time.milliseconds(),
-  producerId,
-  producerEpoch,
-  0,
-  producerId != RecordBatch.NO_PRODUCER_ID,
-  RecordBatch.NO_PARTITION_LEADER_EPOCH
-)
-
-records.forEach { record =>
-  val keyBytes = serializer.serializeKey(record)
-  val valueBytes = serializer.serializeValue(record)
-
-  if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, 
EMPTY_HEADERS)) recordsBuilder.append(
-currentTimeMs,
-keyBytes,
-valueBytes,
-EMPTY_HEADERS
-  ) else throw new RecordTooLargeException(s"Message batch size is 
${recordsBuilder.estimatedSizeInBytes()} bytes " +
-s"in append to partition $tp which exceeds the maximum configured 
size of $maxBatchSize.")
+val bufferSupplier = threadLocalBufferSupplier.get()
+val buffer = bufferSupplier.get(math.min(16384, maxBatchSize))

Review Comment:
   That’s right. I actually took it from here: 
https://github.com/apache/kafka/blob/a83bc2d977d2af85d4edfc8096854137481001e9/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L177



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-00000 Handle consumer close [kafka]

2023-12-04 Thread via GitHub


philipnee opened a new pull request, #14920:
URL: https://github.com/apache/kafka/pull/14920

   When closing the consumer we need to perform a few tasks
   1. If auto-commit is enabled, send an autocommit and block until completed
   2. Invoke all offsetCommitCallbacks and ensure all inflight commits are sent
   3. Invoke partitionsRevoke callbacks
   4. Unsubscribe from all partitions
   5. LeaveGroup


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13988) Mirrormaker 2 auto.offset.reset=latest not working

2023-12-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13988:
--
Fix Version/s: 3.6.2

> Mirrormaker 2 auto.offset.reset=latest not working
> --
>
> Key: KAFKA-13988
> URL: https://issues.apache.org/jira/browse/KAFKA-13988
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.2.0
> Environment: Source Kafka cluster running on Ubuntu 20
> Source Kafka cluster Kafka v0.10
> Target Kafka cluster running in AWS MSK
> Target Kafka cluster Kafka v2.6.2
> Mirrormaker version 3.2.0 running on Ubuntu 20.
>Reporter: Daniel Florek
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.7.0, 3.6.2
>
>
> Hi. 
> I have problem setting up mirroring with MM2 from latest offset between 2 
> clusters. In logs I can see that Consumer that is consuming topics has 
> auto.offset.reset property set to latest. But still topics are read from 
> offset 0. I am using following configuration:
>  
> {code:java}
> clusters = A, B
> A.bootstrap.servers = broker-01A:9092
> B.bootstrap.servers = broker-01B:9092,broker-02B:9092,broker-03B:9092
> replication.policy.class = 
> org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> #Enable replication between clusters and define topics which should be 
> replicated
> A->B.enabled = true
> A->B.topics = .*
> A->B.replication.factor=3
> A->B.emit.heartbeats.enabled = true
> A->B.emit.checkpoints.enabled = true
> auto.offset.reset=latest
> consumer.auto.offset.reset=latest
> A.consumer.auto.offset.reset=latest
> B.consumer.auto.offset.reset=latest
> refresh.topics.enabled=true
> heartbeats.topic.replication.factor=1
> checkpoints.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> config.storage.replication.factor = 1
> offset.storage.replication.factor = 1
> status.storage.replication.factor = 1 {code}
> I am using Kafka 3.2.0 for Mirrormaker 2. Source kafka cluster is 1 broker 
> running on EC2 instance in AWS (quite an old version I think 0.10). Target 
> kafka cluster contains 3 brokers running in AWS MSK (version 2.6.2). 
> Could you point me what I am doing wrong? Or is this possibly a bug?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]

2023-12-04 Thread via GitHub


dajac commented on code in PR #14885:
URL: https://github.com/apache/kafka/pull/14885#discussion_r1414550451


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T](
   compressionType: CompressionType,
   time: Time
 ) extends PartitionWriter[T] {
+  private val threadLocalBufferSupplier = ThreadLocal.withInitial(
+() => new BufferSupplier.GrowableBufferSupplier()
+  )

Review Comment:
   That’s right.



##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T](
   compressionType: CompressionType,
   time: Time
 ) extends PartitionWriter[T] {
+  private val threadLocalBufferSupplier = ThreadLocal.withInitial(
+() => new BufferSupplier.GrowableBufferSupplier()
+  )

Review Comment:
   That’s right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15968 Handle two specific ApiExceptions in EventHandlerExceptionInfo [kafka]

2023-12-04 Thread via GitHub


mumrah opened a new pull request, #14919:
URL: https://github.com/apache/kafka/pull/14919

   This patch adds two cases to EventHandlerExceptionInfo for exceptions thrown 
by the log layer. Prior to this change, the log could throw a 
CorruptRecordException and the QuorumController would not treat it as a fatal 
exception. E.g., 
   
   ```
   INFO [ControllerServer id=9990] handleCommit[baseOffset=192554233]: event 
failed with CorruptRecordException in 234 microseconds.
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 [kafka]

2023-12-04 Thread via GitHub


C0urante merged PR #14567:
URL: https://github.com/apache/kafka/pull/14567


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 [kafka]

2023-12-04 Thread via GitHub


C0urante commented on PR #14567:
URL: https://github.com/apache/kafka/pull/14567#issuecomment-1839519217

   Thanks Greg!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix bug where we weren't registering SnapshotEmitterMetrics [kafka]

2023-12-04 Thread via GitHub


mumrah commented on PR #14918:
URL: https://github.com/apache/kafka/pull/14918#issuecomment-1839514024

   Thanks @cmccabe. Is the impact of this bug that we weren't reporting the two 
metrics LatestSnapshotGeneratedBytes and LatestSnapshotGeneratedAgeMs? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15280: Implement client support for KIP-848 server-side assignors [kafka]

2023-12-04 Thread via GitHub


cadonna commented on code in PR #14878:
URL: https://github.com/apache/kafka/pull/14878#discussion_r1414503110


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java:
##
@@ -157,6 +157,7 @@ protected RequestManagers create() {
 CommitRequestManager commit = null;
 
 if (groupRebalanceConfig != null && 
groupRebalanceConfig.groupId != null) {
+Optional serverAssignor = 
Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));

Review Comment:
   If the protocol is not `consumer` but a group remote assignor is set, do we 
not need to ignore that config as we ignore the auto commit interval when a 
group ID is set?
   
   See 
https://github.com/apache/kafka/blob/ee024966d5283ff95935310e4df039069ae7e695/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L475
   
   If yes, we also need to test that like here:
   
https://github.com/apache/kafka/blob/ee024966d5283ff95935310e4df039069ae7e695/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java#L920
   and here:
   
https://github.com/apache/kafka/blob/ee024966d5283ff95935310e4df039069ae7e695/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java#L904



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement event plumbing for ConsumerRebalanceListener callbacks [kafka]

2023-12-04 Thread via GitHub


philipnee commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1414525826


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1376,6 +1425,66 @@ private void subscribeInternal(Collection 
topics, Optional
+ * Process background events, if any
+ * Briefly wait for {@link CompletableApplicationEvent an 
event} to complete
+ * 
+ *
+ * 
+ *
+ * Each iteration gives the application thread an opportunity to process 
background events, which may be
+ * necessary to complete the overall processing.
+ *
+ * 
+ *
+ * As an example, take {@link #unsubscribe()}. To start unsubscribing, the 
application thread enqueues an
+ * {@link UnsubscribeApplicationEvent} on the application event queue. 
That event will eventually trigger the
+ * rebalancing logic in the background thread. Critically, as part of this 
rebalancing work, the
+ * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} 
callback needs to be invoked. However,
+ * this callback must be executed on the application thread. To achieve 
this, the background thread enqueues a
+ * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background 
event queue. That event queue is
+ * periodically queried by the application thread to see if there's work 
to be done. When the application thread
+ * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is 
processed, and then a
+ * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then 
enqueued by the application thread on the
+ * background event queue. Moments later, the background thread will see 
that event, process it, and continue
+ * execution of the rebalancing logic. The rebalancing logic cannot 
complete until the
+ * {@link ConsumerRebalanceListener} callback is performed.
+ *
+ * @param event Event that contains a {@link CompletableFuture}; it is on 
this future that the application thread
+ *  will wait for completion
+ * @param timer Overall timer that bounds how long the application thread 
will wait for the event to complete
+ * @return {@code true} if the event completed within the timeout, {@code 
false} otherwise
+ */
+private boolean processBackgroundEvents(CompletableApplicationEvent 
event, Timer timer) {
+log.trace("Enqueuing event {} for processing; will wait up to {} ms to 
complete", event, timer.remainingMs());
+
+do {
+backgroundEventProcessor.process();
+
+try {
+Timer pollInterval = time.timer(100L);
+log.trace("Waiting {} ms for event {} to complete", event, 
pollInterval.remainingMs());
+ConsumerUtils.getResult(event.future(), pollInterval);
+log.trace("Event {} completed successfully", event);
+return true;
+} catch (TimeoutException e) {
+// Ignore this as we will retry the event until the timeout 
expires.
+} finally {
+timer.update(time.milliseconds());

Review Comment:
   timer.update() is sufficient



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14885:
URL: https://github.com/apache/kafka/pull/14885#discussion_r1414519712


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T](
   compressionType: CompressionType,
   time: Time
 ) extends PartitionWriter[T] {
+  private val threadLocalBufferSupplier = ThreadLocal.withInitial(
+() => new BufferSupplier.GrowableBufferSupplier()
+  )

Review Comment:
   Just for my understanding though -- in the past we relied on RequestLocal 
for this sort of thing. Is the idea to move to ThreadLocal here so it is safer 
and we don't run into things like 
https://issues.apache.org/jira/browse/KAFKA-15653 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]

2023-12-04 Thread via GitHub


apoorvmittal10 commented on PR #14843:
URL: https://github.com/apache/kafka/pull/14843#issuecomment-1839503610

   @mjsax I have resolved the conflicts and build is triggered. Please let me 
know if PR something else need to be handled or PR can be merged, once build 
succeeds.
   cc: @philipnee @AndrewJSchofield 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-04 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1414512071


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -292,6 +297,90 @@ public void 
testValidateConsumerGroupHeartbeatRequest(final short version) {
 assertNull(heartbeatRequest.data().subscribedTopicRegex());
 }
 
+@Test
+public void testConsumerGroupMetadataFirstUpdate() {
+resetWithZeroHeartbeatInterval(Optional.empty());
+mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+ClientResponse response = createHeartbeatResponse(request, 
Errors.NONE);
+result.unsentRequests.get(0).handler().onComplete(response);
+assertEquals(1, backgroundEventQueue.size());
+final BackgroundEvent event = backgroundEventQueue.poll();
+assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type());
+final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
(GroupMetadataUpdateEvent) event;
+final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
+memberEpoch,
+memberId
+);
+assertEquals(expectedGroupMetadataUpdateEvent, 
groupMetadataUpdateEvent);
+}
+
+@Test
+public void testConsumerGroupMetadataUpdateWithSameUpdate() {
+resetWithZeroHeartbeatInterval(Optional.empty());
+mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+ClientResponse firstResponse = createHeartbeatResponse(request, 
Errors.NONE);
+request.handler().onComplete(firstResponse);
+assertEquals(1, backgroundEventQueue.size());
+final BackgroundEvent firstEvent = backgroundEventQueue.poll();
+assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, 
firstEvent.type());
+
+time.sleep(2000);

Review Comment:
   @kirktrue Although I call  `resetWithZeroHeartbeatInterval()` in the 
beginning of this test method, I need to sleep at least 1000 ms to get a second 
heartbeat response. I thought `resetWithZeroHeartbeatInterval()` sets the 
heartbeat interval to zero, but during debugging I learnt that the heartbeat 
interval is still 1000 ms. Is this intended?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: fix bug where we weren't registering SnapshotEmitterMetrics [kafka]

2023-12-04 Thread via GitHub


cmccabe opened a new pull request, #14918:
URL: https://github.com/apache/kafka/pull/14918

   Fix a bug where we weren't properly exposing SnapshotEmitterMetrics. Add a 
test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]

2023-12-04 Thread via GitHub


jeffkbkim commented on code in PR #14885:
URL: https://github.com/apache/kafka/pull/14885#discussion_r1414507880


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T](
   compressionType: CompressionType,
   time: Time
 ) extends PartitionWriter[T] {
+  private val threadLocalBufferSupplier = ThreadLocal.withInitial(
+() => new BufferSupplier.GrowableBufferSupplier()
+  )

Review Comment:
   makes sense. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15645) Move ReplicationQuotasTestRig to tools

2023-12-04 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-15645:
---
Parent: KAFKA-14525
Issue Type: Sub-task  (was: Task)

> Move ReplicationQuotasTestRig to tools
> --
>
> Key: KAFKA-15645
> URL: https://issues.apache.org/jira/browse/KAFKA-15645
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Nikolay Izhikov
>Assignee: Nikolay Izhikov
>Priority: Minor
>
> ReplicationQuotasTestRig class used for measuring performance.
> Conains dependencies to `ReassignPartitionCommand` API.
> To move all commands to tools must move ReplicationQuotasTestRig to tools, 
> also.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14885:
URL: https://github.com/apache/kafka/pull/14885#discussion_r1414500105


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T](
   compressionType: CompressionType,
   time: Time
 ) extends PartitionWriter[T] {
+  private val threadLocalBufferSupplier = ThreadLocal.withInitial(
+() => new BufferSupplier.GrowableBufferSupplier()
+  )

Review Comment:
   For more context on 4. There are assumptions in the code that a request is 
handled on the same thread so ByteBuffers are not thread safe.
   
   There was a bug in KIP-890 (see 
[here](https://issues.apache.org/jira/browse/KAFKA-15653)) where we used the 
wrong byte buffer and it caused NPEs and other exceptions.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14885:
URL: https://github.com/apache/kafka/pull/14885#discussion_r1414508061


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -126,57 +129,63 @@ class CoordinatorPartitionWriter[T](
 val magic = logConfig.recordVersion.value
 val maxBatchSize = logConfig.maxMessageSize
 val currentTimeMs = time.milliseconds()
-
-val recordsBuilder = MemoryRecords.builder(
-  ByteBuffer.allocate(math.min(16384, maxBatchSize)),
-  magic,
-  compressionType,
-  TimestampType.CREATE_TIME,
-  0L,
-  time.milliseconds(),
-  producerId,
-  producerEpoch,
-  0,
-  producerId != RecordBatch.NO_PRODUCER_ID,
-  RecordBatch.NO_PARTITION_LEADER_EPOCH
-)
-
-records.forEach { record =>
-  val keyBytes = serializer.serializeKey(record)
-  val valueBytes = serializer.serializeValue(record)
-
-  if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, 
EMPTY_HEADERS)) recordsBuilder.append(
-currentTimeMs,
-keyBytes,
-valueBytes,
-EMPTY_HEADERS
-  ) else throw new RecordTooLargeException(s"Message batch size is 
${recordsBuilder.estimatedSizeInBytes()} bytes " +
-s"in append to partition $tp which exceeds the maximum configured 
size of $maxBatchSize.")
+val bufferSupplier = threadLocalBufferSupplier.get()
+val buffer = bufferSupplier.get(math.min(16384, maxBatchSize))

Review Comment:
   Is this 16384 just the default batch size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various fixes in the docs [kafka]

2023-12-04 Thread via GitHub


mimaison merged PR #14914:
URL: https://github.com/apache/kafka/pull/14914


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14885:
URL: https://github.com/apache/kafka/pull/14885#discussion_r1414500105


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T](
   compressionType: CompressionType,
   time: Time
 ) extends PartitionWriter[T] {
+  private val threadLocalBufferSupplier = ThreadLocal.withInitial(
+() => new BufferSupplier.GrowableBufferSupplier()
+  )

Review Comment:
   For more context on 4. There are assumptions in the code that a request is 
handled on the same thread so ByteBuffers are not thread safe.
   
   There was a bug in KIP-890 (see 
[here](https://issues.apache.org/jira/browse/KAFKA-15653)) where we used the 
wrong byte buffer and it caused NPEs and other exceptions.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15061; CoordinatorPartitionWriter should reuse buffer [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14885:
URL: https://github.com/apache/kafka/pull/14885#discussion_r1414500105


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -66,6 +65,10 @@ class CoordinatorPartitionWriter[T](
   compressionType: CompressionType,
   time: Time
 ) extends PartitionWriter[T] {
+  private val threadLocalBufferSupplier = ThreadLocal.withInitial(
+() => new BufferSupplier.GrowableBufferSupplier()
+  )

Review Comment:
   For more context on 4. There are assumptions in the code that a request is 
handled on the same thread. 
   There was a bug in KIP-890 (see 
[here](https://issues.apache.org/jira/browse/KAFKA-15653)) where we used the 
wrong byte buffer and it caused NPEs and other exceptions.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various fixes in the docs [kafka]

2023-12-04 Thread via GitHub


mimaison commented on code in PR #14914:
URL: https://github.com/apache/kafka/pull/14914#discussion_r1414498206


##
docs/ops.html:
##
@@ -3570,7 +3572,7 @@ 6.9 ZooKeeper
 
   Stable version
-  The current stable branch is 3.5. Kafka is regularly updated to include the 
latest release in the 3.5 series.
+  The current stable branch is 3.8. Kafka is regularly updated to include the 
latest release in the 3.8 series.

Review Comment:
   This is the ZooKeeper version. We don't have a variable to track that. At 
this point, it's likely it won't change anymore before we remove ZooKeeper, 
hence why I decided to just update the value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >