errose28 commented on code in PR #10236:
URL: https://github.com/apache/ozone/pull/10236#discussion_r3222325913
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java:
##########
@@ -234,6 +234,9 @@ private void init() throws Exception {
// Enable filesystem snapshot feature for the test regardless of the
default
conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
conf.setInt(OMStorage.TESTING_INIT_APPARENT_VERSION_KEY,
OMLayoutFeature.BUCKET_LAYOUT_SUPPORT.layoutVersion());
+ conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY,
+ HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS.layoutVersion());
Review Comment:
Why do we need to change the HDDS version in this and other OM tests?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizeService.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmClient;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service that periodically checks whether SCM has completed
finalization of an upgrade and, if so,
+ * finalizes the OM upgrade.
+ */
+public class OMUpgradeFinalizeService extends BackgroundService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OMUpgradeFinalizeService.class);
+
+ private static final int THREAD_POOL_SIZE = 1;
+ private static final TimeUnit INTERVAL_UNIT = TimeUnit.MILLISECONDS;
+ private static final long TIMEOUT = 60000;
+ private static final AtomicLong RUN_COUNT = new AtomicLong(0);
+
+ private final OzoneManager ozoneManager;
+ private final OMVersionManager versionManager;
+ private final ScmClient scmClient;
+ private final AtomicBoolean stopInitiated = new AtomicBoolean(false);
+
+ /**
+ * Creates an {@code OMUpgradeFinalizeService} with a custom check interval.
+ * Primarily intended for testing.
Review Comment:
This is the prod constructor too.
```suggestion
```
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMUpgradeFinalization.java:
##########
@@ -93,12 +99,10 @@ void testOMUpgradeFinalizationWithOneOMDown() throws
Exception {
long prepareIndex = omClient.prepareOzoneManager(120L, 5L);
assertClusterPrepared(prepareIndex, runningOms);
AuditLogTestUtils.verifyAuditLog(OMAction.UPGRADE_PREPARE,
AuditEventStatus.SUCCESS);
-
omClient.cancelOzoneManagerPrepare();
AuditLogTestUtils.verifyAuditLog(OMAction.UPGRADE_CANCEL,
AuditEventStatus.SUCCESS);
- StatusAndMessages response =
- omClient.finalizeUpgrade("finalize-test");
- System.out.println("Finalization Messages : " + response.msgs());
+ // Send the finalize command to SCM which triggers the OM finalize
when SCM reports it is complete.
+ cluster.getStorageContainerLocationClient().finalizeUpgrade();
Review Comment:
The audit log check on the next line should be after we have waited for OM
to finish finalizing. Also I think we want to switch this to the system audit
log instead of the user audit log since it's triggered by a background process,
not a user call to the OM.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSyncUpgrade.java:
##########
@@ -117,7 +112,10 @@ public void init() throws Exception {
conf.setTimeDuration(OZONE_OM_LEASE_HARD_LIMIT,
EXPIRE_THRESHOLD_MS, TimeUnit.MILLISECONDS);
conf.set(OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT, "0s");
- conf.setInt(OMStorage.TESTING_INIT_APPARENT_VERSION_KEY,
OMLayoutFeature.MULTITENANCY_SCHEMA.layoutVersion());
+ conf.setInt(OMStorage.TESTING_INIT_APPARENT_VERSION_KEY,
OzoneManagerVersion.ATOMIC_REWRITE_KEY.serialize());
+ conf.setInt(SCMStorageConfig.TESTING_INIT_LAYOUT_VERSION_KEY,
+ HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS.layoutVersion());
+ conf.set(OMConfigKeys.OZONE_OM_UPGRADE_FINALIZATION_CHECK_INTERVAL,
"10ms");
Review Comment:
Why were these versions changed? The OM version used here is older than the
unified versioning framework but not an `OMLayoutFeature` so it is not meant to
be written to the version file.
##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizeService.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.ozone.OzoneManagerVersion;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmClient;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.ratis.protocol.ClientId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for {@link OMUpgradeFinalizeService}.
+ * Uses {@link
org.apache.hadoop.hdds.utils.BackgroundService#runPeriodicalTaskNow()} to
execute
+ * tasks synchronously on the test thread, avoiding timing-dependent polling.
+ */
+public class TestOMUpgradeFinalizeService {
+
+ // A long interval so the scheduler never fires automatically during tests;
+ // we drive execution manually via runPeriodicalTaskNow().
+ private static final long INTERVAL_MS = 60_000;
+
+ private OzoneManager ozoneManager;
+ private OMVersionManager versionManager;
+ private ScmClient scmClient;
+ private StorageContainerLocationProtocol containerClient;
+ private OzoneManagerRatisServer omRatisServer;
+ private OMUpgradeFinalizeService service;
+
+ @BeforeEach
+ void setUp() throws Exception {
Review Comment:
```suggestion
void setUp() {
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizeService.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmClient;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service that periodically checks whether SCM has completed
finalization of an upgrade and, if so,
+ * finalizes the OM upgrade.
+ */
+public class OMUpgradeFinalizeService extends BackgroundService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OMUpgradeFinalizeService.class);
+
+ private static final int THREAD_POOL_SIZE = 1;
+ private static final TimeUnit INTERVAL_UNIT = TimeUnit.MILLISECONDS;
+ private static final long TIMEOUT = 60000;
+ private static final AtomicLong RUN_COUNT = new AtomicLong(0);
+
+ private final OzoneManager ozoneManager;
+ private final OMVersionManager versionManager;
+ private final ScmClient scmClient;
+ private final AtomicBoolean stopInitiated = new AtomicBoolean(false);
+
+ /**
+ * Creates an {@code OMUpgradeFinalizeService} with a custom check interval.
+ * Primarily intended for testing.
+ *
+ * @param ozoneManager the OzoneManager instance
+ * @param versionManager the {@link OMVersionManager} to query the
finalization status
+ * @param scmClient the scmClient instance used to query SCM
+ * @param intervalMs the duration to wait between checks
+ */
+ public OMUpgradeFinalizeService(OzoneManager ozoneManager, OMVersionManager
versionManager, ScmClient scmClient,
+ long intervalMs) {
+ super("OMUpgradeFinalizeService", intervalMs, INTERVAL_UNIT,
THREAD_POOL_SIZE, TIMEOUT,
+ ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.versionManager = versionManager;
+ this.scmClient = scmClient;
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ if (!versionManager.needsFinalization()) {
+ // Finalization is done (or was never needed), so this service can now
shutdown. To avoid deadlocking on the
+ // executor.awaitTermination by calling shutdown directly, spawn a
thread to perform the shutdown which will
+ // block until this task / thread completes in the executor.
+ if (stopInitiated.compareAndSet(false, true)) {
+ LOG.info("OMUpgradeFinalizeService: finalization is no longer needed,
shutting down.");
+ Thread stopper = new Thread(this::shutdown,
"OMUpgradeFinalizeService-stopper");
+ stopper.setDaemon(true);
+ stopper.start();
+ }
+ return queue; // empty — PeriodicalTask.run() will return without
scheduling work
+ }
+ if (ozoneManager.isLeaderReady()) {
+ queue.add(new UpgradeStatusCheckTask());
+ }
+ return queue;
+ }
+
+ /**
+ * Periodic task that checks upgrade finalization status and logs the result.
+ */
+ private class UpgradeStatusCheckTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() {
+ if (!ozoneManager.isLeaderReady()) {
+ LOG.debug("OMUpgradeFinalizeService: skipping check — not the
leader.");
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ if (versionManager.needsFinalization()) {
+ try {
+ HddsProtos.UpgradeStatus upgradeStatus =
scmClient.getContainerClient().queryUpgradeStatus();
+ if (upgradeStatus.getShouldFinalize()) {
+ LOG.info("The SCM Upgrade has been finalized. OM will now
finalize");
+
+ OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.FinalizeUpgrade)
+ .setClientId("todo")
Review Comment:
We should decide if we want to make new protos for the finalize request.
Since we are removing the ability for the client to call it and we can still
make breaking changes without versioning before ZDU is released I'm inclined to
think we should. Doesn't have to be in this PR though.
```suggestion
.setClientId(ozoneManager.getOMNodeId())
```
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSyncUpgrade.java:
##########
@@ -217,20 +215,8 @@ private void finalizeOMUpgrade() throws Exception {
// Trigger OM upgrade finalization. Ref: FinalizeUpgradeSubCommand#call
final OzoneManagerProtocol omClient = client.getObjectStore()
.getClientProxy().getOzoneManagerClient();
- final String upgradeClientID = "Test-Upgrade-Client-" + UUID.randomUUID();
- UpgradeFinalization.StatusAndMessages finalizationResponse =
- omClient.finalizeUpgrade(upgradeClientID);
-
- // The status should transition as soon as the client call above returns
- assertTrue(isStarting(finalizationResponse.status()));
- // Wait for the finalization to be marked as done.
- // 10s timeout should be plenty.
- await(POLL_MAX_WAIT_MILLIS, POLL_INTERVAL_MILLIS, () -> {
- final UpgradeFinalization.StatusAndMessages progress =
- omClient.queryUpgradeFinalizationProgress(
- upgradeClientID, false, false);
- return isDone(progress.status());
- });
+ cluster.getStorageContainerLocationClient().finalizeUpgrade();
+ OMUpgradeTestUtils.waitForFinalization(omClient);
Review Comment:
I'm pretty sure these will always be called together and only in an
integration test scenario. Should we combine them into a single method in
`MiniOzoneCluster` to finalize and wait, similar to `waitForClusterToBeReady`?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizeService.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmClient;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service that periodically checks whether SCM has completed
finalization of an upgrade and, if so,
+ * finalizes the OM upgrade.
+ */
+public class OMUpgradeFinalizeService extends BackgroundService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OMUpgradeFinalizeService.class);
+
+ private static final int THREAD_POOL_SIZE = 1;
+ private static final TimeUnit INTERVAL_UNIT = TimeUnit.MILLISECONDS;
+ private static final long TIMEOUT = 60000;
+ private static final AtomicLong RUN_COUNT = new AtomicLong(0);
+
+ private final OzoneManager ozoneManager;
+ private final OMVersionManager versionManager;
+ private final ScmClient scmClient;
+ private final AtomicBoolean stopInitiated = new AtomicBoolean(false);
+
+ /**
+ * Creates an {@code OMUpgradeFinalizeService} with a custom check interval.
+ * Primarily intended for testing.
+ *
+ * @param ozoneManager the OzoneManager instance
+ * @param versionManager the {@link OMVersionManager} to query the
finalization status
+ * @param scmClient the scmClient instance used to query SCM
+ * @param intervalMs the duration to wait between checks
+ */
+ public OMUpgradeFinalizeService(OzoneManager ozoneManager, OMVersionManager
versionManager, ScmClient scmClient,
+ long intervalMs) {
+ super("OMUpgradeFinalizeService", intervalMs, INTERVAL_UNIT,
THREAD_POOL_SIZE, TIMEOUT,
+ ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.versionManager = versionManager;
+ this.scmClient = scmClient;
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ if (!versionManager.needsFinalization()) {
+ // Finalization is done (or was never needed), so this service can now
shutdown. To avoid deadlocking on the
+ // executor.awaitTermination by calling shutdown directly, spawn a
thread to perform the shutdown which will
+ // block until this task / thread completes in the executor.
+ if (stopInitiated.compareAndSet(false, true)) {
+ LOG.info("OMUpgradeFinalizeService: finalization is no longer needed,
shutting down.");
+ Thread stopper = new Thread(this::shutdown,
"OMUpgradeFinalizeService-stopper");
+ stopper.setDaemon(true);
+ stopper.start();
+ }
+ return queue; // empty — PeriodicalTask.run() will return without
scheduling work
+ }
+ if (ozoneManager.isLeaderReady()) {
+ queue.add(new UpgradeStatusCheckTask());
+ }
+ return queue;
+ }
+
+ /**
+ * Periodic task that checks upgrade finalization status and logs the result.
+ */
+ private class UpgradeStatusCheckTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() {
+ if (!ozoneManager.isLeaderReady()) {
+ LOG.debug("OMUpgradeFinalizeService: skipping check — not the
leader.");
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ if (versionManager.needsFinalization()) {
+ try {
+ HddsProtos.UpgradeStatus upgradeStatus =
scmClient.getContainerClient().queryUpgradeStatus();
+ if (upgradeStatus.getShouldFinalize()) {
+ LOG.info("The SCM Upgrade has been finalized. OM will now
finalize");
+
+ OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.FinalizeUpgrade)
+ .setClientId("todo")
+ .build();
+ OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+ omRequest = omClientRequest.preExecute(ozoneManager);
Review Comment:
These will be called automatically when the request is submitted on the next
line.
```suggestion
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizeService.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ScmClient;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service that periodically checks whether SCM has completed
finalization of an upgrade and, if so,
+ * finalizes the OM upgrade.
+ */
+public class OMUpgradeFinalizeService extends BackgroundService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OMUpgradeFinalizeService.class);
+
+ private static final int THREAD_POOL_SIZE = 1;
+ private static final TimeUnit INTERVAL_UNIT = TimeUnit.MILLISECONDS;
+ private static final long TIMEOUT = 60000;
+ private static final AtomicLong RUN_COUNT = new AtomicLong(0);
+
+ private final OzoneManager ozoneManager;
+ private final OMVersionManager versionManager;
+ private final ScmClient scmClient;
+ private final AtomicBoolean stopInitiated = new AtomicBoolean(false);
+
+ /**
+ * Creates an {@code OMUpgradeFinalizeService} with a custom check interval.
+ * Primarily intended for testing.
+ *
+ * @param ozoneManager the OzoneManager instance
+ * @param versionManager the {@link OMVersionManager} to query the
finalization status
+ * @param scmClient the scmClient instance used to query SCM
+ * @param intervalMs the duration to wait between checks
+ */
+ public OMUpgradeFinalizeService(OzoneManager ozoneManager, OMVersionManager
versionManager, ScmClient scmClient,
+ long intervalMs) {
+ super("OMUpgradeFinalizeService", intervalMs, INTERVAL_UNIT,
THREAD_POOL_SIZE, TIMEOUT,
+ ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.versionManager = versionManager;
+ this.scmClient = scmClient;
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ if (!versionManager.needsFinalization()) {
+ // Finalization is done (or was never needed), so this service can now
shutdown. To avoid deadlocking on the
+ // executor.awaitTermination by calling shutdown directly, spawn a
thread to perform the shutdown which will
+ // block until this task / thread completes in the executor.
+ if (stopInitiated.compareAndSet(false, true)) {
+ LOG.info("OMUpgradeFinalizeService: finalization is no longer needed,
shutting down.");
+ Thread stopper = new Thread(this::shutdown,
"OMUpgradeFinalizeService-stopper");
+ stopper.setDaemon(true);
+ stopper.start();
+ }
+ return queue; // empty — PeriodicalTask.run() will return without
scheduling work
+ }
+ if (ozoneManager.isLeaderReady()) {
+ queue.add(new UpgradeStatusCheckTask());
+ }
+ return queue;
+ }
+
+ /**
+ * Periodic task that checks upgrade finalization status and logs the result.
+ */
+ private class UpgradeStatusCheckTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() {
+ if (!ozoneManager.isLeaderReady()) {
+ LOG.debug("OMUpgradeFinalizeService: skipping check — not the
leader.");
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ if (versionManager.needsFinalization()) {
+ try {
+ HddsProtos.UpgradeStatus upgradeStatus =
scmClient.getContainerClient().queryUpgradeStatus();
+ if (upgradeStatus.getShouldFinalize()) {
+ LOG.info("The SCM Upgrade has been finalized. OM will now
finalize");
+
+ OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.FinalizeUpgrade)
+ .setClientId("todo")
+ .build();
+ OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+ omRequest = omClientRequest.preExecute(ozoneManager);
+ OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest,
ClientId.randomId(),
Review Comment:
We should probably get the response and log if it fails.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]