[
https://issues.apache.org/jira/browse/KAFKA-18877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17935801#comment-17935801
]
Chia-Ping Tsai commented on KAFKA-18877:
----------------------------------------
I do love [~davidarthur] suggestion. Maybe we can add the unsafe interface into
ControllerWriteOperation#generateRecordsAndResult. for example:
{code:java}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 56e3848ed3..1055a350a8 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -742,7 +742,7 @@ public final class QuorumController implements Controller {
*
* @return A result containing a list of records, and the RPC result.
*/
- ControllerResult<T> generateRecordsAndResult() throws Exception;
+ ControllerResult<T> generateRecordsAndResult(Unsafer unsafer) throws
Exception;
/**
* Once we've passed the records to the Raft layer, we will invoke
this function
@@ -753,6 +753,10 @@ public final class QuorumController implements Controller {
}
}
+ interface Unsafer {
+ FeatureControlManager featureControlManager();
+ }
+
/**
* A controller event that modifies the controller state.
*/
@@ -764,6 +768,7 @@ public final class QuorumController implements Controller {
private final EnumSet<ControllerOperationFlag> flags;
private OptionalLong startProcessingTimeNs = OptionalLong.empty();
private ControllerResultAndOffset<T> resultAndOffset;
+ private final Unsafer unsafer;
ControllerWriteEvent(
String name,
@@ -775,6 +780,12 @@ public final class QuorumController implements Controller {
this.op = op;
this.flags = flags;
this.resultAndOffset = null;
+ this.unsafer = new Unsafer() {
+ @Override
+ public FeatureControlManager featureControlManager() {
+ return featureControl();
+ }
+ };
}
CompletableFuture<T> future() {
@@ -792,7 +803,7 @@ public final class QuorumController implements Controller {
if (!isActiveController(controllerEpoch)) {
throw
ControllerExceptions.newWrongControllerException(latestController());
}
- ControllerResult<T> result = op.generateRecordsAndResult();
+ ControllerResult<T> result = op.generateRecordsAndResult(unsafer);
if (result.records().isEmpty()) {
op.processBatchEndOffset(offsetControl.nextWriteOffset() - 1);
// If the operation did not return any records, then it was
actually just
@@ -1993,9 +2004,9 @@ public final class QuorumController implements Controller
{
BrokerRegistrationRequestData request
) {
return appendWriteEvent("registerBroker", context.deadlineNs(),
- () -> {
+ unsafer -> {
// Read and write data in the controller event handling thread
to avoid stale information.
- Map<String, Short> controllerFeatures = new
HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
+ Map<String, Short> controllerFeatures = new
HashMap<>(unsafer.featureControlManager().finalizedFeatures(Long.MAX_VALUE).featureMap());
// Populate finalized features map with latest known kraft
version for validation.
controllerFeatures.put(KRaftVersion.FEATURE_NAME,
raftClient.kraftVersion().featureLevel());
return clusterControl.
{code}
> an mechanism to find cases where we accessed variables from the wrong thread
> ----------------------------------------------------------------------------
>
> Key: KAFKA-18877
> URL: https://issues.apache.org/jira/browse/KAFKA-18877
> Project: Kafka
> Issue Type: Improvement
> Reporter: Chia-Ping Tsai
> Assignee: TengYao Chi
> Priority: Major
>
> from [https://github.com/apache/kafka/pull/18997#pullrequestreview-2645589959]
> There are some _non-thread safe_ classes storing the important information,
> and so they are expected to be access by specific thread. Otherwise, it may
> cause unexpected behavior
--
This message was sent by Atlassian Jira
(v8.20.10#820010)