mumrah commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1150733624
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -869,19 +906,30 @@ public CompletableFuture<?>
acceptBatch(List<ApiMessageAndVersion> recordBatch)
return future;
}
ControllerWriteEvent<Void> batchEvent = new
ControllerWriteEvent<>("ZK Migration Batch",
- new MigrationWriteOperation(recordBatch));
+ new MigrationWriteOperation(recordBatch),
EnumSet.of(RUNS_IN_PREMIGRATION));
queue.append(batchEvent);
return batchEvent.future;
}
@Override
- public OffsetAndEpoch completeMigration() {
- // TODO write migration record, use KIP-868 transaction
- return highestMigrationRecordOffset;
+ public CompletableFuture<OffsetAndEpoch> completeMigration() {
+ log.info("Completing ZK Migration");
+ // TODO use KIP-868 transaction
+ ControllerWriteEvent<Void> event = new
ControllerWriteEvent<>("Complete ZK Migration",
+ new MigrationWriteOperation(
+ Collections.singletonList(
+ new ApiMessageAndVersion(
+ new
ZkMigrationStateRecord().setZkMigrationState(ZkMigrationState.MIGRATION.value()),
+ ZkMigrationStateRecord.LOWEST_SUPPORTED_VERSION)
+ )),
+ EnumSet.of(RUNS_IN_PREMIGRATION));
+ queue.append(event);
+ return event.future.thenApply(__ -> highestMigrationRecordOffset);
}
@Override
public void abortMigration() {
+ log.error("Aborting ZK Migration");
Review Comment:
At some point, we had discussed implementing crude transaction support here
where we would rewind the log to the prior to the migration records. Since we
expect metadata transactions to land in 3.5, I opted to omit the temporary
solution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]