cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296178891


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation<Void> {
             }
             @Override
             public ControllerResult<Void> generateRecordsAndResult() {
-                return ControllerResult.atomicOf(batch, null);
+                return ControllerResult.of(batch, null);
             }
 
             public void processBatchEndOffset(long offset) {
                 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
             }
         }
         @Override
-        public void beginMigration() {
-            log.info("Starting ZK Migration");
-            // TODO use KIP-868 transaction
+        public CompletableFuture<?> beginMigration() {
+            if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+                log.info("Starting ZK Migration");
+                ControllerWriteEvent<Void> batchEvent = new 
ControllerWriteEvent<>(
+                    "Begin ZK Migration Transaction",
+                    new MigrationWriteOperation(Collections.singletonList(
+                        new ApiMessageAndVersion(
+                            new BeginTransactionRecord().setName("ZK 
Migration"), (short) 0))
+                    ), eventFlags);
+                queue.append(batchEvent);
+                return batchEvent.future;
+            } else {
+                log.warn("Starting ZK Migration without metadata transactions 
enabled. This is not safe since " +
+                    "a controller failover or processing error may lead to 
partially migrated metadata.");
+                return CompletableFuture.completedFuture(null);
+            }
         }
 
         @Override
         public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> 
recordBatch) {
-            if (queue.size() > 100) { // TODO configure this
-                CompletableFuture<Void> future = new CompletableFuture<>();
-                future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-                return future;
-            }
-            ControllerWriteEvent<Void> batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-                new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+            ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>(
+                "ZK Migration Batch",
+                new MigrationWriteOperation(recordBatch), eventFlags);
             queue.append(batchEvent);
             return batchEvent.future;
         }
 
         @Override
         public CompletableFuture<OffsetAndEpoch> completeMigration() {
             log.info("Completing ZK Migration");
-            // TODO use KIP-868 transaction
-            ControllerWriteEvent<Void> event = new 
ControllerWriteEvent<>("Complete ZK Migration",
-                new MigrationWriteOperation(
-                    
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
-                EnumSet.of(RUNS_IN_PREMIGRATION));
+            List<ApiMessageAndVersion> records = new ArrayList<>(2);
+            records.add(ZkMigrationState.MIGRATION.toRecord());
+            if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+                records.add(new ApiMessageAndVersion(new 
EndTransactionRecord(), (short) 0));
+            }
+            ControllerWriteEvent<Void> event = new ControllerWriteEvent<>(
+                "Complete ZK Migration",
+                new MigrationWriteOperation(records),
+                eventFlags);
             queue.append(event);
             return event.future.thenApply(__ -> highestMigrationRecordOffset);
         }
 
         @Override
-        public void abortMigration() {
+        public CompletableFuture<?> abortMigration() {
             fatalFaultHandler.handleFault("Aborting the ZK migration");

Review Comment:
   If you abort the process here by calling `fatalFaultHandler.handleFault`, 
the code you have below to add an AbortTransaction will never be executed.
   
   One approach would be to just abort the process, and rely on the new 
controller that comes up to put an initial AbortTransaction into the log to 
clear the current pending transaction. We have to be able to handle that case 
anyway, since crashing during initial ZK state load is always possible.
   
   If you take this approach, then basically it means you can delete the code 
below to add an AbortTransaction. Maybe add a comment here saying that the new 
controller will abort the transaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to