mumrah commented on code in PR #14062: URL: https://github.com/apache/kafka/pull/14062#discussion_r1272701730
########## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ########## @@ -756,4 +745,109 @@ static KRaftMigrationOperationConsumer countingOperationConsumer( operationConsumer.accept(logMsg, operation); }; } + + public static class Builder { + private Integer nodeId; + private ZkRecordConsumer zkRecordConsumer; + private MigrationClient zkMigrationClient; + private LegacyPropagator propagator; + private Consumer<MetadataPublisher> initialZkLoadHandler; + private FaultHandler faultHandler; + private QuorumFeatures quorumFeatures; + private KafkaConfigSchema configSchema; + private QuorumControllerMetrics controllerMetrics; + private Time time; + + public Builder setNodeId(int nodeId) { + this.nodeId = nodeId; + return this; + } + + public Builder setZkRecordConsumer(ZkRecordConsumer zkRecordConsumer) { + this.zkRecordConsumer = zkRecordConsumer; + return this; + } + + public Builder setZkMigrationClient(MigrationClient zkMigrationClient) { + this.zkMigrationClient = zkMigrationClient; + return this; + } + + public Builder setPropagator(LegacyPropagator propagator) { + this.propagator = propagator; + return this; + } + + public Builder setInitialZkLoadHandler(Consumer<MetadataPublisher> initialZkLoadHandler) { + this.initialZkLoadHandler = initialZkLoadHandler; + return this; + } + + public Builder setFaultHandler(FaultHandler faultHandler) { + this.faultHandler = faultHandler; + return this; + } + + public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) { + this.quorumFeatures = quorumFeatures; + return this; + } + + public Builder setConfigSchema(KafkaConfigSchema configSchema) { + this.configSchema = configSchema; + return this; + } + + public Builder setControllerMetrics(QuorumControllerMetrics controllerMetrics) { + this.controllerMetrics = controllerMetrics; + return this; + } + + public Builder setTime(Time time) { + this.time = time; + return this; + } + + public KRaftMigrationDriver build() { + if (nodeId == null) { + throw new IllegalStateException("You must specify the node ID of this controller."); + } + if (zkRecordConsumer == null) { + throw new IllegalStateException("You must specify the ZkRecordConsumer."); + } + if (zkMigrationClient == null) { + throw new IllegalStateException("You must specify the MigrationClient."); + } + if (propagator == null) { + throw new IllegalStateException("You must specify the MetadataPropagator."); + } + if (initialZkLoadHandler == null) { + throw new IllegalStateException("You must specify the initial ZK load callback."); + } + if (faultHandler == null) { + throw new IllegalStateException("You must specify the FaultHandler."); + } + if (configSchema == null) { + throw new IllegalStateException("You must specify the KafkaConfigSchema."); + } + if (controllerMetrics == null) { + throw new IllegalStateException("You must specify the QuorumControllerMetrics."); + } + if (time == null) { Review Comment: Hm, yea I figured this one was fine to have a default for, but actually we only have two usages of the builder so I'll just make it required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org