This is an automated email from the ASF dual-hosted git repository. bharat pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new e22a324 HDDS-2007. Make ozone fs shell command work with OM HA service ids (#1360) e22a324 is described below commit e22a324f87a3b24868c4dd3975c94dcfb840fabc Author: Siyao Meng <50227127+smen...@users.noreply.github.com> AuthorDate: Fri Sep 13 11:22:00 2019 -0700 HDDS-2007. Make ozone fs shell command work with OM HA service ids (#1360) --- .../hadoop/ozone/client/OzoneClientFactory.java | 69 +++- .../apache/hadoop/ozone/client/rpc/RpcClient.java | 7 +- .../main/java/org/apache/hadoop/ozone/OmUtils.java | 24 ++ .../ozone/om/ha/OMFailoverProxyProvider.java | 26 +- ...OzoneManagerProtocolClientSideTranslatorPB.java | 6 +- .../src/main/compose/ozone-om-ha/docker-config | 11 +- .../org/apache/hadoop/ozone/MiniOzoneCluster.java | 7 + .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 5 + .../hadoop/ozone/MiniOzoneHAClusterImpl.java | 19 +- .../org/apache/hadoop/ozone/RatisTestHelper.java | 2 +- .../client/rpc/TestOzoneRpcClientAbstract.java | 2 +- .../hadoop/ozone/om/TestOMRatisSnapshots.java | 5 +- .../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 9 +- .../snapshot/TestOzoneManagerSnapshotProvider.java | 7 +- .../apache/hadoop/ozone/ozShell/TestS3Shell.java | 2 +- .../hadoop/ozone/recon/ReconControllerModule.java | 2 +- .../fs/ozone/BasicOzoneClientAdapterImpl.java | 31 +- .../hadoop/fs/ozone/BasicOzoneFileSystem.java | 10 +- .../apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java | 348 +++++++++++++++++++++ 19 files changed, 553 insertions(+), 39 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java index 713a6b2..caf989e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java @@ -119,6 +119,35 @@ public final class OzoneClientFactory { * @param omRpcPort * RPC port of OzoneManager. * + * @param omServiceId + * Service ID of OzoneManager HA cluster. + * + * @param config + * Configuration to be used for OzoneClient creation + * + * @return OzoneClient + * + * @throws IOException + */ + public static OzoneClient getRpcClient(String omHost, Integer omRpcPort, + String omServiceId, Configuration config) throws IOException { + Preconditions.checkNotNull(omHost); + Preconditions.checkNotNull(omRpcPort); + Preconditions.checkNotNull(omServiceId); + Preconditions.checkNotNull(config); + config.set(OZONE_OM_ADDRESS_KEY, omHost + ":" + omRpcPort); + return getRpcClient(omServiceId, config); + } + + /** + * Returns an OzoneClient which will use RPC protocol. + * + * @param omHost + * hostname of OzoneManager to connect. + * + * @param omRpcPort + * RPC port of OzoneManager. + * * @param config * Configuration to be used for OzoneClient creation * @@ -139,6 +168,28 @@ public final class OzoneClientFactory { /** * Returns an OzoneClient which will use RPC protocol. * + * @param omServiceId + * Service ID of OzoneManager HA cluster. + * + * @param config + * Configuration to be used for OzoneClient creation + * + * @return OzoneClient + * + * @throws IOException + */ + public static OzoneClient getRpcClient(String omServiceId, + Configuration config) throws IOException { + Preconditions.checkNotNull(omServiceId); + Preconditions.checkNotNull(config); + // Won't set OZONE_OM_ADDRESS_KEY here since service id is passed directly, + // leaving OZONE_OM_ADDRESS_KEY value as is. + return getClient(getClientProtocol(config, omServiceId), config); + } + + /** + * Returns an OzoneClient which will use RPC protocol. + * * @param config * used for OzoneClient creation * @@ -185,8 +236,24 @@ public final class OzoneClientFactory { */ private static ClientProtocol getClientProtocol(Configuration config) throws IOException { + return getClientProtocol(config, null); + } + + /** + * Returns an instance of Protocol class. + * + * + * @param config + * Configuration used to initialize ClientProtocol. + * + * @return ClientProtocol + * + * @throws IOException + */ + private static ClientProtocol getClientProtocol(Configuration config, + String omServiceId) throws IOException { try { - return new RpcClient(config); + return new RpcClient(config, omServiceId); } catch (Exception e) { final String message = "Couldn't create RpcClient protocol"; LOG.error(message + " exception: ", e); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 17409ab..c75af00 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -144,10 +144,11 @@ public class RpcClient implements ClientProtocol { /** * Creates RpcClient instance with the given configuration. - * @param conf + * @param conf Configuration + * @param omServiceId OM HA Service ID, set this to null if not HA * @throws IOException */ - public RpcClient(Configuration conf) throws IOException { + public RpcClient(Configuration conf, String omServiceId) throws IOException { Preconditions.checkNotNull(conf); this.conf = new OzoneConfiguration(conf); this.ugi = UserGroupInformation.getCurrentUser(); @@ -158,7 +159,7 @@ public class RpcClient implements ClientProtocol { this.ozoneManagerClient = TracingUtil.createProxy( new OzoneManagerProtocolClientSideTranslatorPB( - this.conf, clientId.toString(), ugi), + this.conf, clientId.toString(), omServiceId, ugi), OzoneManagerProtocol.class, conf ); long scmVersion = diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java index bcc7ac5..5a6e193 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java @@ -60,6 +60,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_KE import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,6 +137,29 @@ public final class OmUtils { host.get() + ":" + getOmRpcPort(conf)); } + /** + * Returns true if OZONE_OM_SERVICE_IDS_KEY is defined and not empty. + * @param conf Configuration + * @return true if OZONE_OM_SERVICE_IDS_KEY is defined and not empty; + * else false. + */ + public static boolean isServiceIdsDefined(Configuration conf) { + String val = conf.get(OZONE_OM_SERVICE_IDS_KEY); + return val != null && val.length() > 0; + } + + /** + * Returns true if HA for OzoneManager is configured for the given service id. + * @param conf Configuration + * @param serviceId OM HA cluster service ID + * @return true if HA is configured in the configuration; else false. + */ + public static boolean isOmHAServiceId(Configuration conf, String serviceId) { + Collection<String> omServiceIds = conf.getTrimmedStringCollection( + OZONE_OM_SERVICE_IDS_KEY); + return omServiceIds.contains(serviceId); + } + public static int getOmRpcPort(Configuration conf) { // If no port number is specified then we'll just try the defaultBindPort. final Optional<Integer> port = getPortNumberFromConfigKeys(conf, diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java index 35975bc..a198c84 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java @@ -39,12 +39,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY; /** * A failover proxy provider implementation which allows clients to configure @@ -70,31 +70,33 @@ public class OMFailoverProxyProvider implements private final UserGroupInformation ugi; private final Text delegationTokenService; + private final String omServiceId; + public OMFailoverProxyProvider(OzoneConfiguration configuration, - UserGroupInformation ugi) throws IOException { + UserGroupInformation ugi, String omServiceId) throws IOException { this.conf = configuration; this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class); this.ugi = ugi; - loadOMClientConfigs(conf); + this.omServiceId = omServiceId; + loadOMClientConfigs(conf, this.omServiceId); this.delegationTokenService = computeDelegationTokenService(); currentProxyIndex = 0; currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex); } - private void loadOMClientConfigs(Configuration config) throws IOException { + public OMFailoverProxyProvider(OzoneConfiguration configuration, + UserGroupInformation ugi) throws IOException { + this(configuration, ugi, null); + } + + private void loadOMClientConfigs(Configuration config, String omSvcId) + throws IOException { this.omProxies = new HashMap<>(); this.omProxyInfos = new HashMap<>(); this.omNodeIDList = new ArrayList<>(); - Collection<String> omServiceIds = config.getTrimmedStringCollection( - OZONE_OM_SERVICE_IDS_KEY); - - if (omServiceIds.size() > 1) { - throw new IllegalArgumentException("Multi-OM Services is not supported." + - " Please configure only one OM Service ID in " + - OZONE_OM_SERVICE_IDS_KEY); - } + Collection<String> omServiceIds = Collections.singletonList(omSvcId); for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) { Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index abc02eb..559f454 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -191,8 +191,10 @@ public final class OzoneManagerProtocolClientSideTranslatorPB * cluster. */ public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf, - String clientId, UserGroupInformation ugi) throws IOException { - this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi); + String clientId, String omServiceId, UserGroupInformation ugi) + throws IOException { + this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi, + omServiceId); int maxRetries = conf.getInt( OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, diff --git a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config index 6bebfdf..5c3b2a2 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config @@ -14,10 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -OZONE-SITE.XML_ozone.om.nodes=om1,om2,om3 -OZONE-SITE.XML_ozone.om.address.om1=om1 -OZONE-SITE.XML_ozone.om.address.om2=om2 -OZONE-SITE.XML_ozone.om.address.om3=om3 +CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem +CORE-SITE.XML_fs.defaultFS=o3fs://bucket.volume.id1 +OZONE-SITE.XML_ozone.om.service.ids=id1 +OZONE-SITE.XML_ozone.om.nodes.id1=om1,om2,om3 +OZONE-SITE.XML_ozone.om.address.id1.om1=om1 +OZONE-SITE.XML_ozone.om.address.id1.om2=om2 +OZONE-SITE.XML_ozone.om.address.id1.om3=om3 OZONE-SITE.XML_ozone.om.ratis.enable=true OZONE-SITE.XML_ozone.scm.names=scm OZONE-SITE.XML_ozone.enabled=True diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 9dd3f00..8620b0a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -97,6 +97,13 @@ public interface MiniOzoneCluster { void waitTobeOutOfSafeMode() throws TimeoutException, InterruptedException; /** + * Returns OzoneManager Service ID. + * + * @return Service ID String + */ + String getServiceId(); + + /** * Returns {@link StorageContainerManager} associated with this * {@link MiniOzoneCluster} instance. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index dc0ed9f..b0cbc6b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -128,6 +128,11 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { return conf; } + public String getServiceId() { + // Non-HA cluster doesn't have OM Service Id. + return null; + } + /** * Waits for the Ozone cluster to be ready for processing requests. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java index 32a485b..006d854 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMStorage; import org.apache.hadoop.ozone.om.OzoneManager; @@ -52,6 +54,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { private Map<String, OzoneManager> ozoneManagerMap; private List<OzoneManager> ozoneManagers; + private String omServiceId; // Active OMs denote OMs which are up and running private List<OzoneManager> activeOMs; @@ -74,12 +77,19 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { List<OzoneManager> activeOMList, List<OzoneManager> inactiveOMList, StorageContainerManager scm, - List<HddsDatanodeService> hddsDatanodes) { + List<HddsDatanodeService> hddsDatanodes, + String omServiceId) { super(conf, scm, hddsDatanodes); this.ozoneManagerMap = omMap; this.ozoneManagers = new ArrayList<>(omMap.values()); this.activeOMs = activeOMList; this.inactiveOMs = inactiveOMList; + this.omServiceId = omServiceId; + } + + @Override + public String getServiceId() { + return omServiceId; } /** @@ -91,6 +101,11 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { return this.ozoneManagers.get(0); } + @Override + public OzoneClient getRpcClient() throws IOException { + return OzoneClientFactory.getRpcClient(getServiceId(), getConf()); + } + public boolean isOMActive(String omNodeId) { return activeOMs.contains(ozoneManagerMap.get(omNodeId)); } @@ -188,7 +203,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm); MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl( - conf, omMap, activeOMs, inactiveOMs, scm, hddsDatanodes); + conf, omMap, activeOMs, inactiveOMs, scm, hddsDatanodes, omServiceId); if (startDataNodes) { cluster.startHddsDatanodes(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index 9b0f2f7..4e127a3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -79,7 +79,7 @@ public interface RatisTestHelper { public ClientProtocol newOzoneClient() throws IOException { - return new RpcClient(conf); + return new RpcClient(conf, null); } @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index c4f1560..d91f739 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -949,7 +949,7 @@ public abstract class TestOzoneRpcClientAbstract { Configuration configuration = cluster.getConf(); configuration.setBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM, verifyChecksum); - RpcClient client = new RpcClient(configuration); + RpcClient client = new RpcClient(configuration, null); OzoneInputStream is = client.getKey(volumeName, bucketName, keyName); is.read(new byte[100]); is.close(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index ad8020b..b9b0fd7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -52,6 +52,7 @@ public class TestOMRatisSnapshots { private OzoneConfiguration conf; private String clusterId; private String scmId; + private String omServiceId; private int numOfOMs = 3; private static final long SNAPSHOT_THRESHOLD = 50; private static final int LOG_PURGE_GAP = 50; @@ -74,6 +75,7 @@ public class TestOMRatisSnapshots { conf = new OzoneConfiguration(); clusterId = UUID.randomUUID().toString(); scmId = UUID.randomUUID().toString(); + omServiceId = "om-service-test1"; conf.setLong( OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, SNAPSHOT_THRESHOLD); @@ -86,7 +88,8 @@ public class TestOMRatisSnapshots { .setNumOfActiveOMs(2) .build(); cluster.waitForClusterToBeReady(); - objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore(); + objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf) + .getObjectStore(); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index 0f59af1..62658dc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -101,6 +101,7 @@ public class TestOzoneManagerHA { private OzoneConfiguration conf; private String clusterId; private String scmId; + private String omServiceId; private int numOfOMs = 3; private static final long SNAPSHOT_THRESHOLD = 50; private static final int LOG_PURGE_GAP = 50; @@ -123,6 +124,7 @@ public class TestOzoneManagerHA { conf = new OzoneConfiguration(); clusterId = UUID.randomUUID().toString(); scmId = UUID.randomUUID().toString(); + omServiceId = "om-service-test1"; conf.setBoolean(OZONE_ACL_ENABLED, true); conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); @@ -136,11 +138,12 @@ public class TestOzoneManagerHA { cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) .setClusterId(clusterId) .setScmId(scmId) - .setOMServiceId("om-service-test1") + .setOMServiceId(omServiceId) .setNumOfOzoneManagers(numOfOMs) .build(); cluster.waitForClusterToBeReady(); - objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore(); + objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf) + .getObjectStore(); } /** @@ -758,7 +761,7 @@ public class TestOzoneManagerHA { // Get the ObjectStore and FailoverProxyProvider for OM at index i final ObjectStore store = OzoneClientFactory.getRpcClient( - omHostName, rpcPort, conf).getObjectStore(); + omHostName, rpcPort, omServiceId, conf).getObjectStore(); final OMFailoverProxyProvider proxyProvider = store.getClientProxy().getOMProxyProvider(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java index f70579b..90fba83 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java @@ -49,6 +49,7 @@ public class TestOzoneManagerSnapshotProvider { private OzoneConfiguration conf; private String clusterId; private String scmId; + private String omServiceId; private int numOfOMs = 3; @Rule @@ -62,16 +63,18 @@ public class TestOzoneManagerSnapshotProvider { conf = new OzoneConfiguration(); clusterId = UUID.randomUUID().toString(); scmId = UUID.randomUUID().toString(); + omServiceId = "om-service-test1"; conf.setBoolean(OMConfigKeys.OZONE_OM_HTTP_ENABLED_KEY, true); conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) .setClusterId(clusterId) .setScmId(scmId) - .setOMServiceId("om-service-test1") + .setOMServiceId(omServiceId) .setNumOfOzoneManagers(numOfOMs) .build(); cluster.waitForClusterToBeReady(); - objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore(); + objectStore = OzoneClientFactory.getRpcClient(omServiceId, conf) + .getObjectStore(); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestS3Shell.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestS3Shell.java index 4e22856..c55de0b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestS3Shell.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestS3Shell.java @@ -111,7 +111,7 @@ public class TestS3Shell { .build(); conf.setInt(OZONE_REPLICATION, ReplicationFactor.THREE.getValue()); conf.setQuietMode(false); - client = new RpcClient(conf); + client = new RpcClient(conf, null); cluster.waitForClusterToBeReady(); } diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index 21bc5be..9cd66ee 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -100,7 +100,7 @@ public class ReconControllerModule extends AbstractModule { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); ozoneManagerClient = new OzoneManagerProtocolClientSideTranslatorPB( - ozoneConfiguration, clientId.toString(), ugi); + ozoneConfiguration, clientId.toString(), null, ugi); } catch (IOException ioEx) { LOG.error("Error in provisioning OzoneManagerProtocol ", ioEx); } diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java index c76a210..e002087 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; @@ -115,6 +116,29 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter { conf = new OzoneConfiguration(hadoopConf); } + if (omHost == null && OmUtils.isServiceIdsDefined(conf)) { + // When the host name or service id isn't given + // but ozone.om.service.ids is defined, declare failure. + + // This is a safety precaution that prevents the client from + // accidentally failing over to an unintended OM. + throw new IllegalArgumentException("Service ID or host name must not" + + " be omitted when ozone.om.service.ids is defined."); + } + + if (omPort != -1) { + // When the port number is specified, perform the following check + if (OmUtils.isOmHAServiceId(conf, omHost)) { + // If omHost is a service id, it shouldn't use a port + throw new IllegalArgumentException("Port " + omPort + + " specified in URI but host '" + omHost + "' is " + + "a logical (HA) OzoneManager and does not use port information."); + } + } else { + // When port number is not specified, read it from config + omPort = OmUtils.getOmRpcPort(conf); + } + SecurityConfig secConfig = new SecurityConfig(conf); if (secConfig.isSecurityEnabled()) { @@ -129,7 +153,12 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter { int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION, OzoneConfigKeys.OZONE_REPLICATION_DEFAULT); - if (StringUtils.isNotEmpty(omHost) && omPort != -1) { + if (OmUtils.isOmHAServiceId(conf, omHost)) { + // omHost is listed as one of the service ids in the config, + // thus we should treat omHost as omServiceId + this.ozoneClient = + OzoneClientFactory.getRpcClient(omHost, conf); + } else if (StringUtils.isNotEmpty(omHost) && omPort != -1) { this.ozoneClient = OzoneClientFactory.getRpcClient(omHost, omPort, conf); } else { diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java index 06eedba..a1648b4 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java @@ -112,6 +112,11 @@ public class BasicOzoneFileSystem extends FileSystem { "Invalid scheme provided in " + name); String authority = name.getAuthority(); + if (authority == null) { + // authority is null when fs.defaultFS is not a qualified o3fs URI and + // o3fs:/// is passed to the client. matcher will NPE if authority is null + throw new IllegalArgumentException(URI_EXCEPTION_TEXT); + } Matcher matcher = URL_SCHEMA_PATTERN.matcher(authority); @@ -126,7 +131,7 @@ public class BasicOzoneFileSystem extends FileSystem { int omPort = -1; if (!isEmpty(remaining)) { String[] parts = remaining.split(":"); - // Array length should be either 1(host) or 2(host:port) + // Array length should be either 1(hostname or service id) or 2(host:port) if (parts.length > 2) { throw new IllegalArgumentException(getUriExceptionText(conf)); } @@ -137,9 +142,6 @@ public class BasicOzoneFileSystem extends FileSystem { } catch (NumberFormatException e) { throw new IllegalArgumentException(getUriExceptionText(conf)); } - } else { - // If port number is not specified, read it from config - omPort = OmUtils.getOmRpcPort(conf); } } diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java new file mode 100644 index 0000000..ab35191 --- /dev/null +++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java @@ -0,0 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.fs.ozone; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMStorage; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.ToolRunner; +import org.apache.ratis.util.LifeCycle; +import org.hamcrest.core.StringContains; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.HddsUtils.getHostName; +import static org.apache.hadoop.hdds.HddsUtils.getHostPort; + +/** + * Test client-side URI handling with Ozone Manager HA. + */ +public class TestOzoneFsHAURLs { + public static final Logger LOG = LoggerFactory.getLogger( + TestOzoneFsHAURLs.class); + + private OzoneConfiguration conf; + private MiniOzoneCluster cluster; + private String omId; + private String omServiceId; + private String clusterId; + private String scmId; + private OzoneManager om; + private int numOfOMs; + + private String volumeName; + private String bucketName; + private String rootPath; + + private final String o3fsImplKey = + "fs." + OzoneConsts.OZONE_URI_SCHEME + ".impl"; + private final String o3fsImplValue = + "org.apache.hadoop.fs.ozone.OzoneFileSystem"; + + private static final long LEADER_ELECTION_TIMEOUT = 500L; + + @Before + public void init() throws Exception { + conf = new OzoneConfiguration(); + omId = UUID.randomUUID().toString(); + omServiceId = "om-service-test1"; + numOfOMs = 3; + clusterId = UUID.randomUUID().toString(); + scmId = UUID.randomUUID().toString(); + final String path = GenericTestUtils.getTempPath(omId); + java.nio.file.Path metaDirPath = java.nio.file.Paths.get(path, "om-meta"); + conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString()); + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0"); + conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); + conf.setTimeDuration( + OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); + + OMStorage omStore = new OMStorage(conf); + omStore.setClusterId(clusterId); + omStore.setScmId(scmId); + // writes the version file properties + omStore.initialize(); + + // Start the cluster + cluster = MiniOzoneCluster.newHABuilder(conf) + .setClusterId(clusterId) + .setScmId(scmId) + .setOMServiceId(omServiceId) + .setNumOfOzoneManagers(numOfOMs) + .build(); + cluster.waitForClusterToBeReady(); + + om = cluster.getOzoneManager(); + Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState()); + + volumeName = "volume" + RandomStringUtils.randomNumeric(5); + ObjectStore objectStore = + OzoneClientFactory.getRpcClient(omServiceId, conf).getObjectStore(); + objectStore.createVolume(volumeName); + + OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); + bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + retVolumeinfo.createBucket(bucketName); + + rootPath = String.format("%s://%s.%s.%s/", OzoneConsts.OZONE_URI_SCHEME, + bucketName, volumeName, omServiceId); + // Set fs.defaultFS + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + FileSystem fs = FileSystem.get(conf); + // Create some dirs + Path root = new Path("/"); + Path dir1 = new Path(root, "dir1"); + Path dir12 = new Path(dir1, "dir12"); + Path dir2 = new Path(root, "dir2"); + fs.mkdirs(dir12); + fs.mkdirs(dir2); + } + + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * @return the leader OM's RPC address in the MiniOzoneHACluster + */ + private String getLeaderOMNodeAddr() { + String leaderOMNodeAddr = null; + Collection<String> omNodeIds = OmUtils.getOMNodeIds(conf, omServiceId); + assert(omNodeIds.size() == numOfOMs); + MiniOzoneHAClusterImpl haCluster = (MiniOzoneHAClusterImpl) cluster; + // Note: this loop may be implemented inside MiniOzoneHAClusterImpl + for (String omNodeId : omNodeIds) { + // Find the leader OM + if (!haCluster.getOzoneManager(omNodeId).isLeader()) { + continue; + } + // ozone.om.address.omServiceId.omNode + String leaderOMNodeAddrKey = OmUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId); + leaderOMNodeAddr = conf.get(leaderOMNodeAddrKey); + LOG.info("Found leader OM: nodeId=" + omNodeId + ", " + + leaderOMNodeAddrKey + "=" + leaderOMNodeAddr); + // Leader found, no need to continue loop + break; + } + // There has to be a leader + assert(leaderOMNodeAddr != null); + return leaderOMNodeAddr; + } + + /** + * Get host name from an address. This uses getHostName() internally. + * @param addr Address with port number + * @return Host name + */ + private String getHostFromAddress(String addr) { + Optional<String> hostOptional = getHostName(addr); + assert(hostOptional.isPresent()); + return hostOptional.get(); + } + + /** + * Get port number from an address. This uses getHostPort() internally. + * @param addr Address with port + * @return Port number + */ + private int getPortFromAddress(String addr) { + Optional<Integer> portOptional = getHostPort(addr); + assert(portOptional.isPresent()); + return portOptional.get(); + } + + /** + * Test OM HA URLs with qualified fs.defaultFS. + * @throws Exception + */ + @Test + public void testWithQualifiedDefaultFS() throws Exception { + OzoneConfiguration clientConf = new OzoneConfiguration(conf); + clientConf.setQuietMode(false); + clientConf.set(o3fsImplKey, o3fsImplValue); + // fs.defaultFS = o3fs://bucketName.volumeName.omServiceId/ + clientConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + // Pick leader OM's RPC address and assign it to ozone.om.address for + // the test case: ozone fs -ls o3fs://bucket.volume.om1/ + String leaderOMNodeAddr = getLeaderOMNodeAddr(); + // ozone.om.address was set to service id in MiniOzoneHAClusterImpl + clientConf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY, leaderOMNodeAddr); + + FsShell shell = new FsShell(clientConf); + int res; + try { + // Test case 1: ozone fs -ls / + // Expectation: Success. + res = ToolRunner.run(shell, new String[] {"-ls", "/"}); + // Check return value, should be 0 (success) + Assert.assertEquals(res, 0); + + // Test case 2: ozone fs -ls o3fs:/// + // Expectation: Success. fs.defaultFS is a fully qualified path. + res = ToolRunner.run(shell, new String[] {"-ls", "o3fs:///"}); + Assert.assertEquals(res, 0); + + // Test case 3: ozone fs -ls o3fs://bucket.volume/ + // Expectation: Fail. Must have service id or host name when HA is enabled + String unqualifiedPath1 = String.format("%s://%s.%s/", + OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName); + try (GenericTestUtils.SystemErrCapturer capture = + new GenericTestUtils.SystemErrCapturer()) { + res = ToolRunner.run(shell, new String[] {"-ls", unqualifiedPath1}); + // Check stderr, inspired by testDFSWithInvalidCommmand + Assert.assertThat("Command did not print the error message " + + "correctly for test case: ozone fs -ls o3fs://bucket.volume/", + capture.getOutput(), StringContains.containsString( + "-ls: Service ID or host name must not" + + " be omitted when ozone.om.service.ids is defined.")); + } + // Check return value, should be -1 (failure) + Assert.assertEquals(res, -1); + + // Test case 4: ozone fs -ls o3fs://bucket.volume.om1/ + // Expectation: Success. The client should use the port number + // set in ozone.om.address. + String qualifiedPath1 = String.format("%s://%s.%s.%s/", + OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName, + getHostFromAddress(leaderOMNodeAddr)); + res = ToolRunner.run(shell, new String[] {"-ls", qualifiedPath1}); + // Note: this test case will fail if the port is not from the leader node + Assert.assertEquals(res, 0); + + // Test case 5: ozone fs -ls o3fs://bucket.volume.om1:port/ + // Expectation: Success. + String qualifiedPath2 = String.format("%s://%s.%s.%s/", + OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName, + leaderOMNodeAddr); + res = ToolRunner.run(shell, new String[] {"-ls", qualifiedPath2}); + Assert.assertEquals(res, 0); + + // Test case 6: ozone fs -ls o3fs://bucket.volume.id1/ + // Expectation: Success. + String qualifiedPath3 = String.format("%s://%s.%s.%s/", + OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName, omServiceId); + res = ToolRunner.run(shell, new String[] {"-ls", qualifiedPath3}); + Assert.assertEquals(res, 0); + + // Test case 7: ozone fs -ls o3fs://bucket.volume.id1:port/ + // Expectation: Fail. Service ID does not use port information. + // Use the port number from leader OM (doesn't really matter) + String unqualifiedPath2 = String.format("%s://%s.%s.%s:%d/", + OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName, + omServiceId, getPortFromAddress(leaderOMNodeAddr)); + try (GenericTestUtils.SystemErrCapturer capture = + new GenericTestUtils.SystemErrCapturer()) { + res = ToolRunner.run(shell, new String[] {"-ls", unqualifiedPath2}); + // Check stderr + Assert.assertThat("Command did not print the error message " + + "correctly for test case: " + + "ozone fs -ls o3fs://bucket.volume.id1:port/", + capture.getOutput(), StringContains.containsString( + "does not use port information")); + } + // Check return value, should be -1 (failure) + Assert.assertEquals(res, -1); + } finally { + shell.close(); + } + } + + /** + * Helper function for testOtherDefaultFS(), + * run fs -ls o3fs:/// against different fs.defaultFS input. + * + * @param defaultFS Desired fs.defaultFS to be used in the test + * @throws Exception + */ + private void testWithDefaultFS(String defaultFS) throws Exception { + OzoneConfiguration clientConf = new OzoneConfiguration(conf); + clientConf.setQuietMode(false); + clientConf.set(o3fsImplKey, o3fsImplValue); + // fs.defaultFS = file:/// + clientConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + defaultFS); + + FsShell shell = new FsShell(clientConf); + try { + // Test case: ozone fs -ls o3fs:/// + // Expectation: Fail. fs.defaultFS is not a qualified o3fs URI. + int res = ToolRunner.run(shell, new String[] {"-ls", "o3fs:///"}); + Assert.assertEquals(res, -1); + } finally { + shell.close(); + } + } + + /** + * Test OM HA URLs with some unqualified fs.defaultFS. + * @throws Exception + */ + @Test + public void testOtherDefaultFS() throws Exception { + // Test scenarios where fs.defaultFS isn't a fully qualified o3fs + + // fs.defaultFS = file:/// + testWithDefaultFS(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT); + + // fs.defaultFS = hdfs://ns1/ + testWithDefaultFS("hdfs://ns1/"); + + // fs.defaultFS = o3fs:/// + String unqualifiedFs1 = String.format( + "%s:///", OzoneConsts.OZONE_URI_SCHEME); + testWithDefaultFS(unqualifiedFs1); + + // fs.defaultFS = o3fs://bucketName.volumeName/ + String unqualifiedFs2 = String.format("%s://%s.%s/", + OzoneConsts.OZONE_URI_SCHEME, bucketName, volumeName); + testWithDefaultFS(unqualifiedFs2); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org