cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295071752


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -892,48 +909,62 @@ class MigrationWriteOperation implements 
ControllerWriteOperation<Void> {
             }
             @Override
             public ControllerResult<Void> generateRecordsAndResult() {
-                return ControllerResult.atomicOf(batch, null);
+                return ControllerResult.of(batch, null);
             }
 
             public void processBatchEndOffset(long offset) {
                 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
             }
         }
         @Override
-        public void beginMigration() {
+        public CompletableFuture<?> beginMigration() {
             log.info("Starting ZK Migration");
-            // TODO use KIP-868 transaction
+            ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>(
+                "Begin ZK Migration Transaction",
+                new MigrationWriteOperation(Collections.singletonList(
+                    new ApiMessageAndVersion(
+                        new BeginTransactionRecord().setName("ZK Migration"), 
(short) 0))
+                ), eventFlags);
+            queue.append(batchEvent);
+            return batchEvent.future;
         }
 
         @Override
         public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> 
recordBatch) {
-            if (queue.size() > 100) { // TODO configure this
-                CompletableFuture<Void> future = new CompletableFuture<>();
-                future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-                return future;
-            }
-            ControllerWriteEvent<Void> batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-                new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+            ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>(
+                "ZK Migration Batch",
+                new MigrationWriteOperation(recordBatch), eventFlags);
             queue.append(batchEvent);
             return batchEvent.future;
         }
 
         @Override
         public CompletableFuture<OffsetAndEpoch> completeMigration() {
             log.info("Completing ZK Migration");
-            // TODO use KIP-868 transaction
-            ControllerWriteEvent<Void> event = new 
ControllerWriteEvent<>("Complete ZK Migration",
+            ControllerWriteEvent<Void> event = new ControllerWriteEvent<>(
+                "Complete ZK Migration",
                 new MigrationWriteOperation(
-                    
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
-                EnumSet.of(RUNS_IN_PREMIGRATION));
+                    Arrays.asList(

Review Comment:
   This is another case where we have to check the MV I guess



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to