linyiqun commented on a change in pull request #1456: URL: https://github.com/apache/hadoop-ozone/pull/1456#discussion_r499129873
########## File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java ########## @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.upgrade; + +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; +import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.UPDATE_LAYOUT_VERSION_FAILED; +import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED; +import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE; +import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS; +import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED; + +/** + * UpgradeFinalizer implementation for the Ozone Manager service. + */ +public class OMUpgradeFinalizer implements UpgradeFinalizer<OzoneManager> { + + private Status status = ALREADY_FINALIZED; + private OMLayoutVersionManagerImpl versionManager; + private String clientID; + + private Queue<String> msgs = new ConcurrentLinkedQueue<>(); + private boolean isDone = false; + + private static final OmUpgradeAction NOOP = a -> {}; + + public OMUpgradeFinalizer(OMLayoutVersionManagerImpl versionManager) { + this.versionManager = versionManager; + if (versionManager.needsFinalization()) { + status = FINALIZATION_REQUIRED; + } + } + + @Override + public StatusAndMessages finalize(String upgradeClientID, OzoneManager om) + throws IOException { + if (!versionManager.needsFinalization()) { + return FINALIZED_MSG; + } + clientID = upgradeClientID; + +// This requires some more investigation on how to do it properly while +// requests are on the fly, and post finalize features one by one. +// Until that is done, monitoring is not really doing anything meaningful +// but this is a tradoff we can take for the first iteration either if needed, +// as the finalization of the first few features should not take that long. +// Follow up JIRA is in HDDS-4286 +// String threadName = "OzoneManager-Upgrade-Finalizer"; +// ExecutorService executor = +// Executors.newSingleThreadExecutor(r -> new Thread(threadName)); +// executor.submit(new Worker(om)); + new Worker(om).call(); + return STARTING_MSG; + } + + @Override + public StatusAndMessages reportStatus( + String upgradeClientID, boolean takeover + ) throws IOException { + if (takeover) { + clientID = upgradeClientID; + } + assertClientId(upgradeClientID); + List<String> returningMsgs = new ArrayList<>(msgs.size()+10); + status = isDone ? FINALIZATION_DONE : FINALIZATION_IN_PROGRESS; + while (msgs.size() > 0) { + returningMsgs.add(msgs.poll()); + } + return new StatusAndMessages(status, returningMsgs); + } + + private void assertClientId(String id) throws OMException { + if (!this.clientID.equals(id)) { + throw new OMException("Unknown client tries to get finalization status.\n" + + "The requestor is not the initiating client of the finalization," + + " if you want to take over, and get unsent status messages, check" + + " -takeover option.", INVALID_REQUEST); + } + } + + + + + private class Worker implements Callable<Void> { + private OzoneManager ozoneManager; + + Worker(OzoneManager om) { + ozoneManager = om; + } + + @Override + public Void call() throws OMException { + try { + emitStartingMsg(); + + for (OMLayoutFeature f : versionManager.unfinalizedFeatures()) { + finalizeFeature(f); + updateLayoutVersionInVersionFile(f); + versionManager.finalized(f); + } + + emitFinishedMsg(); + } finally { + isDone = true; + } + return null; + } + + private void finalizeFeature(OMLayoutFeature feature) + throws OMException { + OmUpgradeAction action = feature.onFinalizeAction().orElse(NOOP); + + if (action == NOOP) { + emitNOOPMsg(feature.name()); + return; + } + + putFinalizationMarkIntoVersionFile(feature); + + emitStartingFinalizationActionMsg(feature.name()); + action.executeAction(ozoneManager); + emitFinishFinalizationActionMsg(feature.name()); + + removeFinalizationMarkFromVersionFile(feature); + } + + private void updateLayoutVersionInVersionFile(OMLayoutFeature feature) + throws OMException { + int prevLayoutVersion = currentStoredLayoutVersion(); + + updateStorageLayoutVersion(feature.layoutVersion()); + try { + persistStorage(); + } catch (IOException e) { + updateStorageLayoutVersion(prevLayoutVersion); + logLayoutVersionUpdateFailureAndThrow(e); + } + } + + private void putFinalizationMarkIntoVersionFile(OMLayoutFeature feature) + throws OMException { + try { + emitUpgradeToLayoutVersionPersistingMsg(feature.name()); + + setUpgradeToLayoutVersionInStorage(feature.layoutVersion()); + persistStorage(); Review comment: Comment makes sense to me. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-issues-h...@hadoop.apache.org