jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1423297237
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -915,64 +975,239 @@ class ReplicaManager(val config: KafkaConfig,
// probably unblock some follower fetch requests since log end
offset has been updated
delayedFetchPurgatory.checkAndComplete(requestKey)
case LeaderHwChange.NONE =>
- // nothing
- }
+ // nothing
}
+ }
}
+ }
- recordConversionStatsCallback(localProduceResults.map { case (k, v) => k
-> v.info.recordValidationStats })
+ /**
+ * Append messages to offsets topic, and wait for them to be replicated to
other replicas;
+ * the callback function will be triggered either when timeout or the
required acks are satisfied;
+ * if the callback function itself is already synchronized on some object
then pass this object to avoid deadlock.
+ * This method should not return until the write to the local log is
completed because updating offsets requires updating
+ * the in-memory and persisted state under a lock together.
+ *
+ * Noted that all pending delayed check operations are stored in a queue.
All callers to ReplicaManager.appendRecords()
+ * are expected to call ActionQueue.tryCompleteActions for all affected
partitions, without holding any conflicting
+ * locks.
+ *
+ * If appending transactional records, call
maybeStartTransactionVerificationForPartition(s) and call this method in the
callback.
+ * For example, the GroupCoordinator will pass `doCommitTxnOffsets` as the
post-verification callback, and that method eventually call this.
+ *
+ * @param timeout maximum time we will wait to append
before returning
+ * @param requiredAcks number of replicas who must
acknowledge the append before sending the response
+ * @param internalTopicsAllowed boolean indicating whether internal
topics can be appended to
+ * @param origin source of the append request (ie,
client, replication, coordinator)
+ * @param entriesPerPartition the records per partition to be
appended
+ * @param responseCallback callback for sending the response
+ * @param delayedProduceLock lock for the delayed actions
+ * @param requestLocal container for the stateful instances
scoped to this request -- this must correspond to the
+ * thread calling this method
+ * @param verificationGuards the mapping from topic partition to
verification guards if transaction verification is used
+ */
+ def appendForGroup(
Review Comment:
sure -- but what is the best error here... 😅 invalid operation?
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -915,64 +975,239 @@ class ReplicaManager(val config: KafkaConfig,
// probably unblock some follower fetch requests since log end
offset has been updated
delayedFetchPurgatory.checkAndComplete(requestKey)
case LeaderHwChange.NONE =>
- // nothing
- }
+ // nothing
}
+ }
}
+ }
- recordConversionStatsCallback(localProduceResults.map { case (k, v) => k
-> v.info.recordValidationStats })
+ /**
+ * Append messages to offsets topic, and wait for them to be replicated to
other replicas;
+ * the callback function will be triggered either when timeout or the
required acks are satisfied;
+ * if the callback function itself is already synchronized on some object
then pass this object to avoid deadlock.
+ * This method should not return until the write to the local log is
completed because updating offsets requires updating
+ * the in-memory and persisted state under a lock together.
+ *
+ * Noted that all pending delayed check operations are stored in a queue.
All callers to ReplicaManager.appendRecords()
+ * are expected to call ActionQueue.tryCompleteActions for all affected
partitions, without holding any conflicting
+ * locks.
+ *
+ * If appending transactional records, call
maybeStartTransactionVerificationForPartition(s) and call this method in the
callback.
+ * For example, the GroupCoordinator will pass `doCommitTxnOffsets` as the
post-verification callback, and that method eventually call this.
+ *
+ * @param timeout maximum time we will wait to append
before returning
+ * @param requiredAcks number of replicas who must
acknowledge the append before sending the response
+ * @param internalTopicsAllowed boolean indicating whether internal
topics can be appended to
+ * @param origin source of the append request (ie,
client, replication, coordinator)
+ * @param entriesPerPartition the records per partition to be
appended
+ * @param responseCallback callback for sending the response
+ * @param delayedProduceLock lock for the delayed actions
+ * @param requestLocal container for the stateful instances
scoped to this request -- this must correspond to the
+ * thread calling this method
+ * @param verificationGuards the mapping from topic partition to
verification guards if transaction verification is used
+ */
+ def appendForGroup(
Review Comment:
sure -- but what is the best error here... 😅 invalid operation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]