mumrah commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296220658
########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation<Void> { } @Override public ControllerResult<Void> generateRecordsAndResult() { - return ControllerResult.atomicOf(batch, null); + return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override - public void beginMigration() { - log.info("Starting ZK Migration"); - // TODO use KIP-868 transaction + public CompletableFuture<?> beginMigration() { + if (featureControl.metadataVersion().isMetadataTransactionSupported()) { + log.info("Starting ZK Migration"); + ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>( + "Begin ZK Migration Transaction", + new MigrationWriteOperation(Collections.singletonList( + new ApiMessageAndVersion( + new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) + ), eventFlags); + queue.append(batchEvent); + return batchEvent.future; + } else { + log.warn("Starting ZK Migration without metadata transactions enabled. This is not safe since " + + "a controller failover or processing error may lead to partially migrated metadata."); + return CompletableFuture.completedFuture(null); + } } @Override public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) { - if (queue.size() > 100) { // TODO configure this - CompletableFuture<Void> future = new CompletableFuture<>(); - future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); - return future; - } - ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", - new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); + ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>( + "ZK Migration Batch", + new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture<OffsetAndEpoch> completeMigration() { log.info("Completing ZK Migration"); - // TODO use KIP-868 transaction - ControllerWriteEvent<Void> event = new ControllerWriteEvent<>("Complete ZK Migration", - new MigrationWriteOperation( - Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), - EnumSet.of(RUNS_IN_PREMIGRATION)); + List<ApiMessageAndVersion> records = new ArrayList<>(2); + records.add(ZkMigrationState.MIGRATION.toRecord()); + if (featureControl.metadataVersion().isMetadataTransactionSupported()) { + records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); + } + ControllerWriteEvent<Void> event = new ControllerWriteEvent<>( + "Complete ZK Migration", + new MigrationWriteOperation(records), + eventFlags); queue.append(event); return event.future.thenApply(__ -> highestMigrationRecordOffset); } @Override - public void abortMigration() { + public CompletableFuture<?> abortMigration() { fatalFaultHandler.handleFault("Aborting the ZK migration"); Review Comment: Hm, yea good call out. I think killing the process is reasonable here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org