http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java deleted file mode 100644 index cc2f78a..0000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java +++ /dev/null @@ -1,391 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.ksm; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.ksm.exceptions.KSMException; -import org.apache.hadoop.ozone.protocol.proto - .KeySpaceManagerProtocolProtos.OzoneAclInfo; -import org.apache.hadoop.ozone.protocol.proto - .KeySpaceManagerProtocolProtos.VolumeList; -import org.apache.hadoop.ozone.protocol.proto - .KeySpaceManagerProtocolProtos.VolumeInfo; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BatchOperation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys - .OZONE_KSM_USER_MAX_VOLUME_DEFAULT; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys - .OZONE_KSM_USER_MAX_VOLUME; -import static org.apache.hadoop.ozone.ksm.exceptions - .KSMException.ResultCodes; - -/** - * KSM volume management code. - */ -public class VolumeManagerImpl implements VolumeManager { - private static final Logger LOG = - LoggerFactory.getLogger(VolumeManagerImpl.class); - - private final KSMMetadataManager metadataManager; - private final int maxUserVolumeCount; - - /** - * Constructor. - * @param conf - Ozone configuration. - * @throws IOException - */ - public VolumeManagerImpl(KSMMetadataManager metadataManager, - OzoneConfiguration conf) throws IOException { - this.metadataManager = metadataManager; - this.maxUserVolumeCount = conf.getInt(OZONE_KSM_USER_MAX_VOLUME, - OZONE_KSM_USER_MAX_VOLUME_DEFAULT); - } - - // Helpers to add and delete volume from user list - private void addVolumeToOwnerList(String volume, String owner, - BatchOperation batchOperation) throws IOException { - // Get the volume list - byte[] dbUserKey = metadataManager.getUserKey(owner); - byte[] volumeList = metadataManager.get(dbUserKey); - List<String> prevVolList = new LinkedList<>(); - if (volumeList != null) { - VolumeList vlist = VolumeList.parseFrom(volumeList); - prevVolList.addAll(vlist.getVolumeNamesList()); - } - - // Check the volume count - if (prevVolList.size() >= maxUserVolumeCount) { - LOG.debug("Too many volumes for user:{}", owner); - throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES); - } - - // Add the new volume to the list - prevVolList.add(volume); - VolumeList newVolList = VolumeList.newBuilder() - .addAllVolumeNames(prevVolList).build(); - batchOperation.put(dbUserKey, newVolList.toByteArray()); - } - - private void delVolumeFromOwnerList(String volume, String owner, - BatchOperation batchOperation) - throws IOException { - // Get the volume list - byte[] dbUserKey = metadataManager.getUserKey(owner); - byte[] volumeList = metadataManager.get(dbUserKey); - List<String> prevVolList = new LinkedList<>(); - if (volumeList != null) { - VolumeList vlist = VolumeList.parseFrom(volumeList); - prevVolList.addAll(vlist.getVolumeNamesList()); - } else { - LOG.debug("volume:{} not found for user:{}"); - throw new KSMException(ResultCodes.FAILED_USER_NOT_FOUND); - } - - // Remove the volume from the list - prevVolList.remove(volume); - if (prevVolList.size() == 0) { - batchOperation.delete(dbUserKey); - } else { - VolumeList newVolList = VolumeList.newBuilder() - .addAllVolumeNames(prevVolList).build(); - batchOperation.put(dbUserKey, newVolList.toByteArray()); - } - } - - /** - * Creates a volume. - * @param args - KsmVolumeArgs. - */ - @Override - public void createVolume(KsmVolumeArgs args) throws IOException { - Preconditions.checkNotNull(args); - metadataManager.writeLock().lock(); - try { - byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume()); - byte[] volumeInfo = metadataManager.get(dbVolumeKey); - - // Check of the volume already exists - if (volumeInfo != null) { - LOG.debug("volume:{} already exists", args.getVolume()); - throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS); - } - - BatchOperation batch = new BatchOperation(); - // Write the vol info - List<HddsProtos.KeyValue> metadataList = new LinkedList<>(); - for (Map.Entry<String, String> entry : args.getKeyValueMap().entrySet()) { - metadataList.add(HddsProtos.KeyValue.newBuilder() - .setKey(entry.getKey()).setValue(entry.getValue()).build()); - } - List<OzoneAclInfo> aclList = args.getAclMap().ozoneAclGetProtobuf(); - - VolumeInfo newVolumeInfo = VolumeInfo.newBuilder() - .setAdminName(args.getAdminName()) - .setOwnerName(args.getOwnerName()) - .setVolume(args.getVolume()) - .setQuotaInBytes(args.getQuotaInBytes()) - .addAllMetadata(metadataList) - .addAllVolumeAcls(aclList) - .setCreationTime(Time.now()) - .build(); - batch.put(dbVolumeKey, newVolumeInfo.toByteArray()); - - // Add volume to user list - addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch); - metadataManager.writeBatch(batch); - LOG.debug("created volume:{} user:{}", args.getVolume(), - args.getOwnerName()); - } catch (IOException ex) { - if (!(ex instanceof KSMException)) { - LOG.error("Volume creation failed for user:{} volume:{}", - args.getOwnerName(), args.getVolume(), ex); - } - throw ex; - } finally { - metadataManager.writeLock().unlock(); - } - } - - /** - * Changes the owner of a volume. - * - * @param volume - Name of the volume. - * @param owner - Name of the owner. - * @throws IOException - */ - @Override - public void setOwner(String volume, String owner) throws IOException { - Preconditions.checkNotNull(volume); - Preconditions.checkNotNull(owner); - metadataManager.writeLock().lock(); - try { - byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); - byte[] volInfo = metadataManager.get(dbVolumeKey); - if (volInfo == null) { - LOG.debug("Changing volume ownership failed for user:{} volume:{}", - owner, volume); - throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - - VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo); - KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo); - Preconditions.checkState(volume.equals(volumeInfo.getVolume())); - - BatchOperation batch = new BatchOperation(); - delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch); - addVolumeToOwnerList(volume, owner, batch); - - KsmVolumeArgs newVolumeArgs = - KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume()) - .setAdminName(volumeArgs.getAdminName()) - .setOwnerName(owner) - .setQuotaInBytes(volumeArgs.getQuotaInBytes()) - .setCreationTime(volumeArgs.getCreationTime()) - .build(); - - VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf(); - batch.put(dbVolumeKey, newVolumeInfo.toByteArray()); - - metadataManager.writeBatch(batch); - } catch (IOException ex) { - if (!(ex instanceof KSMException)) { - LOG.error("Changing volume ownership failed for user:{} volume:{}", - owner, volume, ex); - } - throw ex; - } finally { - metadataManager.writeLock().unlock(); - } - } - - /** - * Changes the Quota on a volume. - * - * @param volume - Name of the volume. - * @param quota - Quota in bytes. - * @throws IOException - */ - public void setQuota(String volume, long quota) throws IOException { - Preconditions.checkNotNull(volume); - metadataManager.writeLock().lock(); - try { - byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); - byte[] volInfo = metadataManager.get(dbVolumeKey); - if (volInfo == null) { - LOG.debug("volume:{} does not exist", volume); - throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - - VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo); - KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo); - Preconditions.checkState(volume.equals(volumeInfo.getVolume())); - - KsmVolumeArgs newVolumeArgs = - KsmVolumeArgs.newBuilder() - .setVolume(volumeArgs.getVolume()) - .setAdminName(volumeArgs.getAdminName()) - .setOwnerName(volumeArgs.getOwnerName()) - .setQuotaInBytes(quota) - .setCreationTime(volumeArgs.getCreationTime()).build(); - - VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf(); - metadataManager.put(dbVolumeKey, newVolumeInfo.toByteArray()); - } catch (IOException ex) { - if (!(ex instanceof KSMException)) { - LOG.error("Changing volume quota failed for volume:{} quota:{}", volume, - quota, ex); - } - throw ex; - } finally { - metadataManager.writeLock().unlock(); - } - } - - /** - * Gets the volume information. - * @param volume - Volume name. - * @return VolumeArgs or exception is thrown. - * @throws IOException - */ - public KsmVolumeArgs getVolumeInfo(String volume) throws IOException { - Preconditions.checkNotNull(volume); - metadataManager.readLock().lock(); - try { - byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); - byte[] volInfo = metadataManager.get(dbVolumeKey); - if (volInfo == null) { - LOG.debug("volume:{} does not exist", volume); - throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - - VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo); - KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo); - Preconditions.checkState(volume.equals(volumeInfo.getVolume())); - return volumeArgs; - } catch (IOException ex) { - if (!(ex instanceof KSMException)) { - LOG.warn("Info volume failed for volume:{}", volume, ex); - } - throw ex; - } finally { - metadataManager.readLock().unlock(); - } - } - - /** - * Deletes an existing empty volume. - * - * @param volume - Name of the volume. - * @throws IOException - */ - @Override - public void deleteVolume(String volume) throws IOException { - Preconditions.checkNotNull(volume); - metadataManager.writeLock().lock(); - try { - BatchOperation batch = new BatchOperation(); - byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); - byte[] volInfo = metadataManager.get(dbVolumeKey); - if (volInfo == null) { - LOG.debug("volume:{} does not exist", volume); - throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - - if (!metadataManager.isVolumeEmpty(volume)) { - LOG.debug("volume:{} is not empty", volume); - throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_EMPTY); - } - - VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo); - Preconditions.checkState(volume.equals(volumeInfo.getVolume())); - // delete the volume from the owner list - // as well as delete the volume entry - delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch); - batch.delete(dbVolumeKey); - metadataManager.writeBatch(batch); - } catch (IOException ex) { - if (!(ex instanceof KSMException)) { - LOG.error("Delete volume failed for volume:{}", volume, ex); - } - throw ex; - } finally { - metadataManager.writeLock().unlock(); - } - } - - /** - * Checks if the specified user with a role can access this volume. - * - * @param volume - volume - * @param userAcl - user acl which needs to be checked for access - * @return true if the user has access for the volume, false otherwise - * @throws IOException - */ - public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) - throws IOException { - Preconditions.checkNotNull(volume); - Preconditions.checkNotNull(userAcl); - metadataManager.readLock().lock(); - try { - byte[] dbVolumeKey = metadataManager.getVolumeKey(volume); - byte[] volInfo = metadataManager.get(dbVolumeKey); - if (volInfo == null) { - LOG.debug("volume:{} does not exist", volume); - throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - - VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo); - KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo); - Preconditions.checkState(volume.equals(volumeInfo.getVolume())); - return volumeArgs.getAclMap().hasAccess(userAcl); - } catch (IOException ex) { - if (!(ex instanceof KSMException)) { - LOG.error("Check volume access failed for volume:{} user:{} rights:{}", - volume, userAcl.getName(), userAcl.getRights(), ex); - } - throw ex; - } finally { - metadataManager.readLock().unlock(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public List<KsmVolumeArgs> listVolumes(String userName, - String prefix, String startKey, int maxKeys) throws IOException { - metadataManager.readLock().lock(); - try { - return metadataManager.listVolumes( - userName, prefix, startKey, maxKeys); - } finally { - metadataManager.readLock().unlock(); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java deleted file mode 100644 index b902eab..0000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.ksm.exceptions; - -import java.io.IOException; - -/** - * Exception thrown by KSM. - */ -public class KSMException extends IOException { - private final KSMException.ResultCodes result; - - /** - * Constructs an {@code IOException} with {@code null} - * as its error detail message. - */ - public KSMException(KSMException.ResultCodes result) { - this.result = result; - } - - /** - * Constructs an {@code IOException} with the specified detail message. - * - * @param message The detail message (which is saved for later retrieval by - * the - * {@link #getMessage()} method) - */ - public KSMException(String message, KSMException.ResultCodes result) { - super(message); - this.result = result; - } - - /** - * Constructs an {@code IOException} with the specified detail message - * and cause. - * <p> - * <p> Note that the detail message associated with {@code cause} is - * <i>not</i> automatically incorporated into this exception's detail - * message. - * - * @param message The detail message (which is saved for later retrieval by - * the - * {@link #getMessage()} method) - * @param cause The cause (which is saved for later retrieval by the {@link - * #getCause()} method). (A null value is permitted, and indicates that the - * cause is nonexistent or unknown.) - * @since 1.6 - */ - public KSMException(String message, Throwable cause, - KSMException.ResultCodes result) { - super(message, cause); - this.result = result; - } - - /** - * Constructs an {@code IOException} with the specified cause and a - * detail message of {@code (cause==null ? null : cause.toString())} - * (which typically contains the class and detail message of {@code cause}). - * This constructor is useful for IO exceptions that are little more - * than wrappers for other throwables. - * - * @param cause The cause (which is saved for later retrieval by the {@link - * #getCause()} method). (A null value is permitted, and indicates that the - * cause is nonexistent or unknown.) - * @since 1.6 - */ - public KSMException(Throwable cause, KSMException.ResultCodes result) { - super(cause); - this.result = result; - } - - /** - * Returns resultCode. - * @return ResultCode - */ - public KSMException.ResultCodes getResult() { - return result; - } - - /** - * Error codes to make it easy to decode these exceptions. - */ - public enum ResultCodes { - FAILED_TOO_MANY_USER_VOLUMES, - FAILED_VOLUME_ALREADY_EXISTS, - FAILED_VOLUME_NOT_FOUND, - FAILED_VOLUME_NOT_EMPTY, - FAILED_USER_NOT_FOUND, - FAILED_BUCKET_ALREADY_EXISTS, - FAILED_BUCKET_NOT_FOUND, - FAILED_BUCKET_NOT_EMPTY, - FAILED_KEY_ALREADY_EXISTS, - FAILED_KEY_NOT_FOUND, - FAILED_KEY_ALLOCATION, - FAILED_KEY_DELETION, - FAILED_KEY_RENAME, - FAILED_INVALID_KEY_NAME, - FAILED_METADATA_ERROR, - FAILED_INTERNAL_ERROR, - KSM_NOT_INITIALIZED, - SCM_VERSION_MISMATCH_ERROR - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java deleted file mode 100644 index 09fd87f..0000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.ksm.exceptions; -// Exception thrown by KSM. http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java deleted file mode 100644 index 09d9f32..0000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.ksm; -/* - This package contains the keyspace manager classes. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java new file mode 100644 index 0000000..ddb2b0e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; + +import java.io.IOException; +import java.util.List; + +/** + * BucketManager handles all the bucket level operations. + */ +public interface BucketManager { + /** + * Creates a bucket. + * @param bucketInfo - OmBucketInfo for creating bucket. + */ + void createBucket(OmBucketInfo bucketInfo) throws IOException; + /** + * Returns Bucket Information. + * @param volumeName - Name of the Volume. + * @param bucketName - Name of the Bucket. + */ + OmBucketInfo getBucketInfo(String volumeName, String bucketName) + throws IOException; + + /** + * Sets bucket property from args. + * @param args - BucketArgs. + * @throws IOException + */ + void setBucketProperty(OmBucketArgs args) throws IOException; + + /** + * Deletes an existing empty bucket from volume. + * @param volumeName - Name of the volume. + * @param bucketName - Name of the bucket. + * @throws IOException + */ + void deleteBucket(String volumeName, String bucketName) throws IOException; + + /** + * Returns a list of buckets represented by {@link OmBucketInfo} + * in the given volume. + * + * @param volumeName + * Required parameter volume name determines buckets in which volume + * to return. + * @param startBucket + * Optional start bucket name parameter indicating where to start + * the bucket listing from, this key is excluded from the result. + * @param bucketPrefix + * Optional start key parameter, restricting the response to buckets + * that begin with the specified name. + * @param maxNumOfBuckets + * The maximum number of buckets to return. It ensures + * the size of the result will not exceed this limit. + * @return a list of buckets. + * @throws IOException + */ + List<OmBucketInfo> listBuckets(String volumeName, + String startBucket, String bucketPrefix, int maxNumOfBuckets) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java new file mode 100644 index 0000000..4bbce81 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.protocol.proto + .OzoneManagerProtocolProtos.BucketInfo; +import org.apache.hadoop.ozone.OzoneAcl; +import org.apache.hadoop.util.Time; +import org.iq80.leveldb.DBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * OM bucket manager. + */ +public class BucketManagerImpl implements BucketManager { + private static final Logger LOG = + LoggerFactory.getLogger(BucketManagerImpl.class); + + /** + * OMMetadataManager is used for accessing OM MetadataDB and ReadWriteLock. + */ + private final OMMetadataManager metadataManager; + + /** + * Constructs BucketManager. + * @param metadataManager + */ + public BucketManagerImpl(OMMetadataManager metadataManager){ + this.metadataManager = metadataManager; + } + + /** + * MetadataDB is maintained in MetadataManager and shared between + * BucketManager and VolumeManager. (and also by KeyManager) + * + * BucketManager uses MetadataDB to store bucket level information. + * + * Keys used in BucketManager for storing data into MetadataDB + * for BucketInfo: + * {volume/bucket} -> bucketInfo + * + * Work flow of create bucket: + * + * -> Check if the Volume exists in metadataDB, if not throw + * VolumeNotFoundException. + * -> Else check if the Bucket exists in metadataDB, if so throw + * BucketExistException + * -> Else update MetadataDB with VolumeInfo. + */ + + /** + * Creates a bucket. + * @param bucketInfo - OmBucketInfo. + */ + @Override + public void createBucket(OmBucketInfo bucketInfo) throws IOException { + Preconditions.checkNotNull(bucketInfo); + metadataManager.writeLock().lock(); + String volumeName = bucketInfo.getVolumeName(); + String bucketName = bucketInfo.getBucketName(); + try { + byte[] volumeKey = metadataManager.getVolumeKey(volumeName); + byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); + + //Check if the volume exists + if (metadataManager.get(volumeKey) == null) { + LOG.debug("volume: {} not found ", volumeName); + throw new OMException("Volume doesn't exist", + OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); + } + //Check if bucket already exists + if (metadataManager.get(bucketKey) != null) { + LOG.debug("bucket: {} already exists ", bucketName); + throw new OMException("Bucket already exist", + OMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS); + } + + OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder() + .setVolumeName(bucketInfo.getVolumeName()) + .setBucketName(bucketInfo.getBucketName()) + .setAcls(bucketInfo.getAcls()) + .setStorageType(bucketInfo.getStorageType()) + .setIsVersionEnabled(bucketInfo.getIsVersionEnabled()) + .setCreationTime(Time.now()) + .build(); + metadataManager.put(bucketKey, omBucketInfo.getProtobuf().toByteArray()); + + LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName); + } catch (IOException | DBException ex) { + if (!(ex instanceof OMException)) { + LOG.error("Bucket creation failed for bucket:{} in volume:{}", + bucketName, volumeName, ex); + } + throw ex; + } finally { + metadataManager.writeLock().unlock(); + } + } + + /** + * Returns Bucket Information. + * + * @param volumeName - Name of the Volume. + * @param bucketName - Name of the Bucket. + */ + @Override + public OmBucketInfo getBucketInfo(String volumeName, String bucketName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + metadataManager.readLock().lock(); + try { + byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); + byte[] value = metadataManager.get(bucketKey); + if (value == null) { + LOG.debug("bucket: {} not found in volume: {}.", bucketName, + volumeName); + throw new OMException("Bucket not found", + OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); + } + return OmBucketInfo.getFromProtobuf(BucketInfo.parseFrom(value)); + } catch (IOException | DBException ex) { + if (!(ex instanceof OMException)) { + LOG.error("Exception while getting bucket info for bucket: {}", + bucketName, ex); + } + throw ex; + } finally { + metadataManager.readLock().unlock(); + } + } + + /** + * Sets bucket property from args. + * @param args - BucketArgs. + * @throws IOException + */ + @Override + public void setBucketProperty(OmBucketArgs args) throws IOException { + Preconditions.checkNotNull(args); + metadataManager.writeLock().lock(); + String volumeName = args.getVolumeName(); + String bucketName = args.getBucketName(); + try { + byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); + //Check if volume exists + if(metadataManager.get(metadataManager.getVolumeKey(volumeName)) == + null) { + LOG.debug("volume: {} not found ", volumeName); + throw new OMException("Volume doesn't exist", + OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); + } + byte[] value = metadataManager.get(bucketKey); + //Check if bucket exist + if(value == null) { + LOG.debug("bucket: {} not found ", bucketName); + throw new OMException("Bucket doesn't exist", + OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); + } + OmBucketInfo oldBucketInfo = OmBucketInfo.getFromProtobuf( + BucketInfo.parseFrom(value)); + OmBucketInfo.Builder bucketInfoBuilder = OmBucketInfo.newBuilder(); + bucketInfoBuilder.setVolumeName(oldBucketInfo.getVolumeName()) + .setBucketName(oldBucketInfo.getBucketName()); + + //Check ACLs to update + if(args.getAddAcls() != null || args.getRemoveAcls() != null) { + bucketInfoBuilder.setAcls(getUpdatedAclList(oldBucketInfo.getAcls(), + args.getRemoveAcls(), args.getAddAcls())); + LOG.debug("Updating ACLs for bucket: {} in volume: {}", + bucketName, volumeName); + } else { + bucketInfoBuilder.setAcls(oldBucketInfo.getAcls()); + } + + //Check StorageType to update + StorageType storageType = args.getStorageType(); + if (storageType != null) { + bucketInfoBuilder.setStorageType(storageType); + LOG.debug("Updating bucket storage type for bucket: {} in volume: {}", + bucketName, volumeName); + } else { + bucketInfoBuilder.setStorageType(oldBucketInfo.getStorageType()); + } + + //Check Versioning to update + Boolean versioning = args.getIsVersionEnabled(); + if (versioning != null) { + bucketInfoBuilder.setIsVersionEnabled(versioning); + LOG.debug("Updating bucket versioning for bucket: {} in volume: {}", + bucketName, volumeName); + } else { + bucketInfoBuilder + .setIsVersionEnabled(oldBucketInfo.getIsVersionEnabled()); + } + bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime()); + + metadataManager.put(bucketKey, + bucketInfoBuilder.build().getProtobuf().toByteArray()); + } catch (IOException | DBException ex) { + if (!(ex instanceof OMException)) { + LOG.error("Setting bucket property failed for bucket:{} in volume:{}", + bucketName, volumeName, ex); + } + throw ex; + } finally { + metadataManager.writeLock().unlock(); + } + } + + /** + * Updates the existing ACL list with remove and add ACLs that are passed. + * Remove is done before Add. + * + * @param existingAcls - old ACL list. + * @param removeAcls - ACLs to be removed. + * @param addAcls - ACLs to be added. + * @return updated ACL list. + */ + private List<OzoneAcl> getUpdatedAclList(List<OzoneAcl> existingAcls, + List<OzoneAcl> removeAcls, List<OzoneAcl> addAcls) { + if(removeAcls != null && !removeAcls.isEmpty()) { + existingAcls.removeAll(removeAcls); + } + if(addAcls != null && !addAcls.isEmpty()) { + addAcls.stream().filter(acl -> !existingAcls.contains(acl)).forEach( + existingAcls::add); + } + return existingAcls; + } + + /** + * Deletes an existing empty bucket from volume. + * @param volumeName - Name of the volume. + * @param bucketName - Name of the bucket. + * @throws IOException + */ + public void deleteBucket(String volumeName, String bucketName) + throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + metadataManager.writeLock().lock(); + try { + byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); + //Check if volume exists + if (metadataManager.get(metadataManager.getVolumeKey(volumeName)) + == null) { + LOG.debug("volume: {} not found ", volumeName); + throw new OMException("Volume doesn't exist", + OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); + } + //Check if bucket exist + if (metadataManager.get(bucketKey) == null) { + LOG.debug("bucket: {} not found ", bucketName); + throw new OMException("Bucket doesn't exist", + OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); + } + //Check if bucket is empty + if (!metadataManager.isBucketEmpty(volumeName, bucketName)) { + LOG.debug("bucket: {} is not empty ", bucketName); + throw new OMException("Bucket is not empty", + OMException.ResultCodes.FAILED_BUCKET_NOT_EMPTY); + } + metadataManager.delete(bucketKey); + } catch (IOException ex) { + if (!(ex instanceof OMException)) { + LOG.error("Delete bucket failed for bucket:{} in volume:{}", bucketName, + volumeName, ex); + } + throw ex; + } finally { + metadataManager.writeLock().unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<OmBucketInfo> listBuckets(String volumeName, + String startBucket, String bucketPrefix, int maxNumOfBuckets) + throws IOException { + Preconditions.checkNotNull(volumeName); + metadataManager.readLock().lock(); + try { + return metadataManager.listBuckets( + volumeName, startBucket, bucketPrefix, maxNumOfBuckets); + } finally { + metadataManager.readLock().unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java new file mode 100644 index 0000000..ee23fe0 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.BackgroundService; +import org.apache.hadoop.utils.BackgroundTask; +import org.apache.hadoop.utils.BackgroundTaskQueue; +import org.apache.hadoop.utils.BackgroundTaskResult; +import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT; + +/** + * This is the background service to delete keys. + * Scan the metadata of om periodically to get + * the keys with prefix "#deleting" and ask scm to + * delete metadata accordingly, if scm returns + * success for keys, then clean up those keys. + */ +public class KeyDeletingService extends BackgroundService { + + private static final Logger LOG = + LoggerFactory.getLogger(KeyDeletingService.class); + + // The thread pool size for key deleting service. + private final static int KEY_DELETING_CORE_POOL_SIZE = 2; + + private final ScmBlockLocationProtocol scmClient; + private final KeyManager manager; + private final int keyLimitPerTask; + + public KeyDeletingService(ScmBlockLocationProtocol scmClient, + KeyManager manager, long serviceInterval, + long serviceTimeout, Configuration conf) { + super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS, + KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); + this.scmClient = scmClient; + this.manager = manager; + this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, + OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new KeyDeletingTask()); + return queue; + } + + /** + * A key deleting task scans OM DB and looking for a certain number + * of pending-deletion keys, sends these keys along with their associated + * blocks to SCM for deletion. Once SCM confirms keys are deleted (once + * SCM persisted the blocks info in its deletedBlockLog), it removes + * these keys from the DB. + */ + private class KeyDeletingTask implements + BackgroundTask<BackgroundTaskResult> { + + @Override + public int getPriority() { + return 0; + } + + @Override + public BackgroundTaskResult call() throws Exception { + try { + long startTime = Time.monotonicNow(); + List<BlockGroup> keyBlocksList = manager + .getPendingDeletionKeys(keyLimitPerTask); + if (keyBlocksList.size() > 0) { + LOG.info("Found {} to-delete keys in OM", keyBlocksList.size()); + List<DeleteBlockGroupResult> results = + scmClient.deleteKeyBlocks(keyBlocksList); + for (DeleteBlockGroupResult result : results) { + if (result.isSuccess()) { + try { + // Purge key from OM DB. + manager.deletePendingDeletionKey(result.getObjectKey()); + LOG.debug("Key {} deleted from OM DB", result.getObjectKey()); + } catch (IOException e) { + // if a pending deletion key is failed to delete, + // print a warning here and retain it in this state, + // so that it can be attempt to delete next time. + LOG.warn("Failed to delete pending-deletion key {}", + result.getObjectKey(), e); + } + } else { + // Key deletion failed, retry in next interval. + LOG.warn("Key {} deletion failed because some of the blocks" + + " were failed to delete, failed blocks: {}", + result.getObjectKey(), + StringUtils.join(",", result.getFailedBlocks())); + } + } + + if (!results.isEmpty()) { + LOG.info("Number of key deleted from OM DB: {}," + + " task elapsed time: {}ms", + results.size(), Time.monotonicNow() - startTime); + } + + return results::size; + } else { + LOG.debug("No pending deletion key found in OM"); + } + } catch (IOException e) { + LOG.error("Unable to get pending deletion keys, retry in" + + " next interval", e); + } + return EmptyTaskResult.newResult(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java new file mode 100644 index 0000000..226c07d --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; + +import java.io.IOException; +import java.util.List; + +/** + * Handles key level commands. + */ +public interface KeyManager { + + /** + * Start key manager. + */ + void start(); + + /** + * Stop key manager. + */ + void stop() throws IOException; + + /** + * After calling commit, the key will be made visible. There can be multiple + * open key writes in parallel (identified by client id). The most recently + * committed one will be the one visible. + * + * @param args the key to commit. + * @param clientID the client that is committing. + * @throws IOException + */ + void commitKey(OmKeyArgs args, int clientID) throws IOException; + + /** + * A client calls this on an open key, to request to allocate a new block, + * and appended to the tail of current block list of the open client. + * + * @param args the key to append + * @param clientID the client requesting block. + * @return the reference to the new block. + * @throws IOException + */ + OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) + throws IOException; + /** + * Given the args of a key to put, write an open key entry to meta data. + * + * In case that the container creation or key write failed on + * DistributedStorageHandler, this key's metadata will still stay in OM. + * TODO garbage collect the open keys that never get closed + * + * @param args the args of the key provided by client. + * @return a OpenKeySession instance client uses to talk to container. + * @throws Exception + */ + OpenKeySession openKey(OmKeyArgs args) throws IOException; + + /** + * Look up an existing key. Return the info of the key to client side, which + * DistributedStorageHandler will use to access the data on datanode. + * + * @param args the args of the key provided by client. + * @return a OmKeyInfo instance client uses to talk to container. + * @throws IOException + */ + OmKeyInfo lookupKey(OmKeyArgs args) throws IOException; + + /** + * Renames an existing key within a bucket. + * + * @param args the args of the key provided by client. + * @param toKeyName New name to be used for the key + * @throws IOException if specified key doesn't exist or + * some other I/O errors while renaming the key. + */ + void renameKey(OmKeyArgs args, String toKeyName) throws IOException; + + /** + * Deletes an object by an object key. The key will be immediately removed + * from OM namespace and become invisible to clients. The object data + * will be removed in async manner that might retain for some time. + * + * @param args the args of the key provided by client. + * @throws IOException if specified key doesn't exist or + * some other I/O errors while deleting an object. + */ + void deleteKey(OmKeyArgs args) throws IOException; + + /** + * Returns a list of keys represented by {@link OmKeyInfo} + * in the given bucket. + * + * @param volumeName + * the name of the volume. + * @param bucketName + * the name of the bucket. + * @param startKey + * the start key name, only the keys whose name is + * after this value will be included in the result. + * This key is excluded from the result. + * @param keyPrefix + * key name prefix, only the keys whose name has + * this prefix will be included in the result. + * @param maxKeys + * the maximum number of keys to return. It ensures + * the size of the result will not exceed this limit. + * @return a list of keys. + * @throws IOException + */ + List<OmKeyInfo> listKeys(String volumeName, + String bucketName, String startKey, String keyPrefix, int maxKeys) + throws IOException; + + /** + * Returns a list of pending deletion key info that ups to the given count. + * Each entry is a {@link BlockGroup}, which contains the info about the + * key name and all its associated block IDs. A pending deletion key is + * stored with #deleting# prefix in OM DB. + * + * @param count max number of keys to return. + * @return a list of {@link BlockGroup} representing keys and blocks. + * @throws IOException + */ + List<BlockGroup> getPendingDeletionKeys(int count) throws IOException; + + /** + * Deletes a pending deletion key by its name. This is often called when + * key can be safely deleted from this layer. Once called, all footprints + * of the key will be purged from OM DB. + * + * @param objectKeyName object key name with #deleting# prefix. + * @throws IOException if specified key doesn't exist or other I/O errors. + */ + void deletePendingDeletionKey(String objectKeyName) throws IOException; + + /** + * Returns a list of all still open key info. Which contains the info about + * the key name and all its associated block IDs. A pending open key has + * prefix #open# in OM DB. + * + * @return a list of {@link BlockGroup} representing keys and blocks. + * @throws IOException + */ + List<BlockGroup> getExpiredOpenKeys() throws IOException; + + /** + * Deletes a expired open key by its name. Called when a hanging key has been + * lingering for too long. Once called, the open key entries gets removed + * from OM mdata data. + * + * @param objectKeyName object key name with #open# prefix. + * @throws IOException if specified key doesn't exist or other I/O errors. + */ + void deleteExpiredOpenKey(String objectKeyName) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java new file mode 100644 index 0000000..ba92a29 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -0,0 +1,566 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.ozone.protocol.proto + .OzoneManagerProtocolProtos.KeyInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.BackgroundService; +import org.apache.hadoop.utils.BatchOperation; +import org.iq80.leveldb.DBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone + .OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT; +import static org.apache.hadoop.ozone + .OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; +import static org.apache.hadoop.ozone + .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone + .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone + .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone + .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone + .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE; +import static org.apache.hadoop.ozone + .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT; +import static org.apache.hadoop.ozone + .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.ozone + .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB; +import org.apache.hadoop.hdds.protocol + .proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol + .proto.HddsProtos.ReplicationFactor; + + +/** + * Implementation of keyManager. + */ +public class KeyManagerImpl implements KeyManager { + private static final Logger LOG = + LoggerFactory.getLogger(KeyManagerImpl.class); + + /** + * A SCM block client, used to talk to SCM to allocate block during putKey. + */ + private final ScmBlockLocationProtocol scmBlockClient; + private final OMMetadataManager metadataManager; + private final long scmBlockSize; + private final boolean useRatis; + private final BackgroundService keyDeletingService; + private final BackgroundService openKeyCleanupService; + + private final long preallocateMax; + private final Random random; + private final String omId; + + public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, + OMMetadataManager metadataManager, OzoneConfiguration conf, + String omId) { + this.scmBlockClient = scmBlockClient; + this.metadataManager = metadataManager; + this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_IN_MB, + OZONE_SCM_BLOCK_SIZE_DEFAULT) * OzoneConsts.MB; + this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY, + DFS_CONTAINER_RATIS_ENABLED_DEFAULT); + long blockDeleteInterval = conf.getTimeDuration( + OZONE_BLOCK_DELETING_SERVICE_INTERVAL, + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + long serviceTimeout = conf.getTimeDuration( + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + this.preallocateMax = conf.getLong( + OZONE_KEY_PREALLOCATION_MAXSIZE, + OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT); + keyDeletingService = new KeyDeletingService( + scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf); + int openkeyCheckInterval = conf.getInt( + OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS, + OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT); + openKeyCleanupService = new OpenKeyCleanupService( + scmBlockClient, this, openkeyCheckInterval, serviceTimeout); + random = new Random(); + this.omId = omId; + } + + @VisibleForTesting + public BackgroundService getOpenKeyCleanupService() { + return openKeyCleanupService; + } + + @Override + public void start() { + keyDeletingService.start(); + openKeyCleanupService.start(); + } + + @Override + public void stop() throws IOException { + keyDeletingService.shutdown(); + openKeyCleanupService.shutdown(); + } + + private void validateBucket(String volumeName, String bucketName) + throws IOException { + byte[] volumeKey = metadataManager.getVolumeKey(volumeName); + byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); + + //Check if the volume exists + if(metadataManager.get(volumeKey) == null) { + LOG.error("volume not found: {}", volumeName); + throw new OMException("Volume not found", + OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); + } + //Check if bucket already exists + if(metadataManager.get(bucketKey) == null) { + LOG.error("bucket not found: {}/{} ", volumeName, bucketName); + throw new OMException("Bucket not found", + OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); + } + } + + @Override + public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) + throws IOException { + Preconditions.checkNotNull(args); + metadataManager.writeLock().lock(); + String volumeName = args.getVolumeName(); + String bucketName = args.getBucketName(); + String keyName = args.getKeyName(); + + try { + validateBucket(volumeName, bucketName); + String objectKey = metadataManager.getKeyWithDBPrefix( + volumeName, bucketName, keyName); + byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID); + byte[] keyData = metadataManager.get(openKey); + if (keyData == null) { + LOG.error("Allocate block for a key not in open status in meta store " + + objectKey + " with ID " + clientID); + throw new OMException("Open Key not found", + OMException.ResultCodes.FAILED_KEY_NOT_FOUND); + } + OmKeyInfo keyInfo = + OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData)); + AllocatedBlock allocatedBlock = + scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(), + keyInfo.getFactor(), omId); + OmKeyLocationInfo info = new OmKeyLocationInfo.Builder() + .setBlockID(allocatedBlock.getBlockID()) + .setShouldCreateContainer(allocatedBlock.getCreateContainer()) + .setLength(scmBlockSize) + .setOffset(0) + .build(); + // current version not committed, so new blocks coming now are added to + // the same version + keyInfo.appendNewBlocks(Collections.singletonList(info)); + keyInfo.updateModifcationTime(); + metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray()); + return info; + } finally { + metadataManager.writeLock().unlock(); + } + } + + @Override + public OpenKeySession openKey(OmKeyArgs args) throws IOException { + Preconditions.checkNotNull(args); + metadataManager.writeLock().lock(); + String volumeName = args.getVolumeName(); + String bucketName = args.getBucketName(); + String keyName = args.getKeyName(); + ReplicationFactor factor = args.getFactor(); + ReplicationType type = args.getType(); + + // If user does not specify a replication strategy or + // replication factor, OM will use defaults. + if(factor == null) { + factor = useRatis ? ReplicationFactor.THREE: ReplicationFactor.ONE; + } + + if(type == null) { + type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE; + } + + try { + validateBucket(volumeName, bucketName); + long requestedSize = Math.min(preallocateMax, args.getDataSize()); + List<OmKeyLocationInfo> locations = new ArrayList<>(); + String objectKey = metadataManager.getKeyWithDBPrefix( + volumeName, bucketName, keyName); + // requested size is not required but more like a optimization: + // SCM looks at the requested, if it 0, no block will be allocated at + // the point, if client needs more blocks, client can always call + // allocateBlock. But if requested size is not 0, OM will preallocate + // some blocks and piggyback to client, to save RPC calls. + while (requestedSize > 0) { + long allocateSize = Math.min(scmBlockSize, requestedSize); + AllocatedBlock allocatedBlock = + scmBlockClient.allocateBlock(allocateSize, type, factor, omId); + OmKeyLocationInfo subKeyInfo = new OmKeyLocationInfo.Builder() + .setBlockID(allocatedBlock.getBlockID()) + .setShouldCreateContainer(allocatedBlock.getCreateContainer()) + .setLength(allocateSize) + .setOffset(0) + .build(); + locations.add(subKeyInfo); + requestedSize -= allocateSize; + } + // NOTE size of a key is not a hard limit on anything, it is a value that + // client should expect, in terms of current size of key. If client sets a + // value, then this value is used, otherwise, we allocate a single block + // which is the current size, if read by the client. + long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize; + byte[] keyKey = metadataManager.getDBKeyBytes( + volumeName, bucketName, keyName); + byte[] value = metadataManager.get(keyKey); + OmKeyInfo keyInfo; + long openVersion; + if (value != null) { + // the key already exist, the new blocks will be added as new version + keyInfo = OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value)); + // when locations.size = 0, the new version will have identical blocks + // as its previous version + openVersion = keyInfo.addNewVersion(locations); + keyInfo.setDataSize(size + keyInfo.getDataSize()); + } else { + // the key does not exist, create a new object, the new blocks are the + // version 0 + long currentTime = Time.now(); + keyInfo = new OmKeyInfo.Builder() + .setVolumeName(args.getVolumeName()) + .setBucketName(args.getBucketName()) + .setKeyName(args.getKeyName()) + .setOmKeyLocationInfos(Collections.singletonList( + new OmKeyLocationInfoGroup(0, locations))) + .setCreationTime(currentTime) + .setModificationTime(currentTime) + .setDataSize(size) + .setReplicationType(type) + .setReplicationFactor(factor) + .build(); + openVersion = 0; + } + // Generate a random ID which is not already in meta db. + int id = -1; + // in general this should finish in a couple times at most. putting some + // arbitrary large number here to avoid dead loop. + for (int j = 0; j < 10000; j++) { + id = random.nextInt(); + byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, id); + if (metadataManager.get(openKey) == null) { + metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray()); + break; + } + } + if (id == -1) { + throw new IOException("Failed to find a usable id for " + objectKey); + } + LOG.debug("Key {} allocated in volume {} bucket {}", + keyName, volumeName, bucketName); + return new OpenKeySession(id, keyInfo, openVersion); + } catch (OMException e) { + throw e; + } catch (IOException ex) { + if (!(ex instanceof OMException)) { + LOG.error("Key open failed for volume:{} bucket:{} key:{}", + volumeName, bucketName, keyName, ex); + } + throw new OMException(ex.getMessage(), + OMException.ResultCodes.FAILED_KEY_ALLOCATION); + } finally { + metadataManager.writeLock().unlock(); + } + } + + @Override + public void commitKey(OmKeyArgs args, int clientID) throws IOException { + Preconditions.checkNotNull(args); + metadataManager.writeLock().lock(); + String volumeName = args.getVolumeName(); + String bucketName = args.getBucketName(); + String keyName = args.getKeyName(); + try { + validateBucket(volumeName, bucketName); + String objectKey = metadataManager.getKeyWithDBPrefix( + volumeName, bucketName, keyName); + byte[] objectKeyBytes = metadataManager.getDBKeyBytes(volumeName, + bucketName, keyName); + byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID); + byte[] openKeyData = metadataManager.get(openKey); + if (openKeyData == null) { + throw new OMException("Commit a key without corresponding entry " + + DFSUtil.bytes2String(openKey), ResultCodes.FAILED_KEY_NOT_FOUND); + } + OmKeyInfo keyInfo = + OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData)); + keyInfo.setDataSize(args.getDataSize()); + keyInfo.setModificationTime(Time.now()); + BatchOperation batch = new BatchOperation(); + batch.delete(openKey); + batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray()); + metadataManager.writeBatch(batch); + } catch (OMException e) { + throw e; + } catch (IOException ex) { + LOG.error("Key commit failed for volume:{} bucket:{} key:{}", + volumeName, bucketName, keyName, ex); + throw new OMException(ex.getMessage(), + OMException.ResultCodes.FAILED_KEY_ALLOCATION); + } finally { + metadataManager.writeLock().unlock(); + } + } + + @Override + public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { + Preconditions.checkNotNull(args); + metadataManager.writeLock().lock(); + String volumeName = args.getVolumeName(); + String bucketName = args.getBucketName(); + String keyName = args.getKeyName(); + try { + byte[] keyKey = metadataManager.getDBKeyBytes( + volumeName, bucketName, keyName); + byte[] value = metadataManager.get(keyKey); + if (value == null) { + LOG.debug("volume:{} bucket:{} Key:{} not found", + volumeName, bucketName, keyName); + throw new OMException("Key not found", + OMException.ResultCodes.FAILED_KEY_NOT_FOUND); + } + return OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value)); + } catch (DBException ex) { + LOG.error("Get key failed for volume:{} bucket:{} key:{}", + volumeName, bucketName, keyName, ex); + throw new OMException(ex.getMessage(), + OMException.ResultCodes.FAILED_KEY_NOT_FOUND); + } finally { + metadataManager.writeLock().unlock(); + } + } + + @Override + public void renameKey(OmKeyArgs args, String toKeyName) throws IOException { + Preconditions.checkNotNull(args); + Preconditions.checkNotNull(toKeyName); + String volumeName = args.getVolumeName(); + String bucketName = args.getBucketName(); + String fromKeyName = args.getKeyName(); + if (toKeyName.length() == 0 || fromKeyName.length() == 0) { + LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}.", + volumeName, bucketName, fromKeyName, toKeyName); + throw new OMException("Key name is empty", + ResultCodes.FAILED_INVALID_KEY_NAME); + } + + metadataManager.writeLock().lock(); + try { + // fromKeyName should exist + byte[] fromKey = metadataManager.getDBKeyBytes( + volumeName, bucketName, fromKeyName); + byte[] fromKeyValue = metadataManager.get(fromKey); + if (fromKeyValue == null) { + // TODO: Add support for renaming open key + LOG.error( + "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. " + + "Key: {} not found.", volumeName, bucketName, fromKeyName, + toKeyName, fromKeyName); + throw new OMException("Key not found", + OMException.ResultCodes.FAILED_KEY_NOT_FOUND); + } + + // toKeyName should not exist + byte[] toKey = + metadataManager.getDBKeyBytes(volumeName, bucketName, toKeyName); + byte[] toKeyValue = metadataManager.get(toKey); + if (toKeyValue != null) { + LOG.error( + "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. " + + "Key: {} already exists.", volumeName, bucketName, + fromKeyName, toKeyName, toKeyName); + throw new OMException("Key not found", + OMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS); + } + + if (fromKeyName.equals(toKeyName)) { + return; + } + + OmKeyInfo newKeyInfo = + OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(fromKeyValue)); + newKeyInfo.setKeyName(toKeyName); + newKeyInfo.updateModifcationTime(); + BatchOperation batch = new BatchOperation(); + batch.delete(fromKey); + batch.put(toKey, newKeyInfo.getProtobuf().toByteArray()); + metadataManager.writeBatch(batch); + } catch (DBException ex) { + LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}.", + volumeName, bucketName, fromKeyName, toKeyName, ex); + throw new OMException(ex.getMessage(), + ResultCodes.FAILED_KEY_RENAME); + } finally { + metadataManager.writeLock().unlock(); + } + } + + @Override + public void deleteKey(OmKeyArgs args) throws IOException { + Preconditions.checkNotNull(args); + metadataManager.writeLock().lock(); + String volumeName = args.getVolumeName(); + String bucketName = args.getBucketName(); + String keyName = args.getKeyName(); + try { + byte[] objectKey = metadataManager.getDBKeyBytes( + volumeName, bucketName, keyName); + byte[] objectValue = metadataManager.get(objectKey); + if (objectValue == null) { + throw new OMException("Key not found", + OMException.ResultCodes.FAILED_KEY_NOT_FOUND); + } + byte[] deletingKey = metadataManager.getDeletedKeyName(objectKey); + BatchOperation batch = new BatchOperation(); + batch.put(deletingKey, objectValue); + batch.delete(objectKey); + metadataManager.writeBatch(batch); + } catch (DBException ex) { + LOG.error(String.format("Delete key failed for volume:%s " + + "bucket:%s key:%s", volumeName, bucketName, keyName), ex); + throw new OMException(ex.getMessage(), ex, + ResultCodes.FAILED_KEY_DELETION); + } finally { + metadataManager.writeLock().unlock(); + } + } + + @Override + public List<OmKeyInfo> listKeys(String volumeName, String bucketName, + String startKey, String keyPrefix, int maxKeys) throws IOException { + Preconditions.checkNotNull(volumeName); + Preconditions.checkNotNull(bucketName); + + metadataManager.readLock().lock(); + try { + return metadataManager.listKeys(volumeName, bucketName, + startKey, keyPrefix, maxKeys); + } finally { + metadataManager.readLock().unlock(); + } + } + + @Override + public List<BlockGroup> getPendingDeletionKeys(final int count) + throws IOException { + metadataManager.readLock().lock(); + try { + return metadataManager.getPendingDeletionKeys(count); + } finally { + metadataManager.readLock().unlock(); + } + } + + @Override + public void deletePendingDeletionKey(String objectKeyName) + throws IOException{ + Preconditions.checkNotNull(objectKeyName); + if (!objectKeyName.startsWith(OzoneConsts.DELETING_KEY_PREFIX)) { + throw new IllegalArgumentException("Invalid key name," + + " the name should be the key name with deleting prefix"); + } + + // Simply removes the entry from OM DB. + metadataManager.writeLock().lock(); + try { + byte[] pendingDelKey = DFSUtil.string2Bytes(objectKeyName); + byte[] delKeyValue = metadataManager.get(pendingDelKey); + if (delKeyValue == null) { + throw new IOException("Failed to delete key " + objectKeyName + + " because it is not found in DB"); + } + metadataManager.delete(pendingDelKey); + } finally { + metadataManager.writeLock().unlock(); + } + } + + @Override + public List<BlockGroup> getExpiredOpenKeys() throws IOException { + metadataManager.readLock().lock(); + try { + return metadataManager.getExpiredOpenKeys(); + } finally { + metadataManager.readLock().unlock(); + } + } + + @Override + public void deleteExpiredOpenKey(String objectKeyName) throws IOException { + Preconditions.checkNotNull(objectKeyName); + if (!objectKeyName.startsWith(OzoneConsts.OPEN_KEY_PREFIX)) { + throw new IllegalArgumentException("Invalid key name," + + " the name should be the key name with open key prefix"); + } + + // Simply removes the entry from OM DB. + metadataManager.writeLock().lock(); + try { + byte[] openKey = DFSUtil.string2Bytes(objectKeyName); + byte[] delKeyValue = metadataManager.get(openKey); + if (delKeyValue == null) { + throw new IOException("Failed to delete key " + objectKeyName + + " because it is not found in DB"); + } + metadataManager.delete(openKey); + } finally { + metadataManager.writeLock().unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java new file mode 100644 index 0000000..3ab9f47 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.server.ServiceRuntimeInfo; + +/** + * This is the JMX management interface for OM information. + */ +@InterfaceAudience.Private +public interface OMMXBean extends ServiceRuntimeInfo { + + String getRpcPort(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java new file mode 100644 index 0000000..f2e78e6 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataStore; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.locks.Lock; + +/** + * OM metadata manager interface. + */ +public interface OMMetadataManager { + /** + * Start metadata manager. + */ + void start(); + + /** + * Stop metadata manager. + */ + void stop() throws IOException; + + /** + * Get metadata store. + * @return metadata store. + */ + @VisibleForTesting + MetadataStore getStore(); + + /** + * Returns the read lock used on Metadata DB. + * @return readLock + */ + Lock readLock(); + + /** + * Returns the write lock used on Metadata DB. + * @return writeLock + */ + Lock writeLock(); + + /** + * Returns the value associated with this key. + * @param key - key + * @return value + */ + byte[] get(byte[] key) throws IOException; + + /** + * Puts a Key into Metadata DB. + * @param key - key + * @param value - value + */ + void put(byte[] key, byte[] value) throws IOException; + + /** + * Deletes a Key from Metadata DB. + * @param key - key + */ + void delete(byte[] key) throws IOException; + + /** + * Atomic write a batch of operations. + * @param batch + * @throws IOException + */ + void writeBatch(BatchOperation batch) throws IOException; + + /** + * Given a volume return the corresponding DB key. + * @param volume - Volume name + */ + byte[] getVolumeKey(String volume); + + /** + * Given a user return the corresponding DB key. + * @param user - User name + */ + byte[] getUserKey(String user); + + /** + * Given a volume and bucket, return the corresponding DB key. + * @param volume - User name + * @param bucket - Bucket name + */ + byte[] getBucketKey(String volume, String bucket); + + /** + * Given a volume, bucket and a key, return the corresponding DB key. + * @param volume - volume name + * @param bucket - bucket name + * @param key - key name + * @return bytes of DB key. + */ + byte[] getDBKeyBytes(String volume, String bucket, String key); + + /** + * Returns the DB key name of a deleted key in OM metadata store. + * The name for a deleted key has prefix #deleting# followed by + * the actual key name. + * @param keyName - key name + * @return bytes of DB key. + */ + byte[] getDeletedKeyName(byte[] keyName); + + /** + * Returns the DB key name of a open key in OM metadata store. + * Should be #open# prefix followed by actual key name. + * @param keyName - key name + * @param id - the id for this open + * @return bytes of DB key. + */ + byte[] getOpenKeyNameBytes(String keyName, int id); + + /** + * Returns the full name of a key given volume name, bucket name and key name. + * Generally done by padding certain delimiters. + * + * @param volumeName - volume name + * @param bucketName - bucket name + * @param keyName - key name + * @return the full key name. + */ + String getKeyWithDBPrefix(String volumeName, String bucketName, + String keyName); + + /** + * Given a volume, check if it is empty, + * i.e there are no buckets inside it. + * @param volume - Volume name + */ + boolean isVolumeEmpty(String volume) throws IOException; + + /** + * Given a volume/bucket, check if it is empty, + * i.e there are no keys inside it. + * @param volume - Volume name + * @param bucket - Bucket name + * @return true if the bucket is empty + */ + boolean isBucketEmpty(String volume, String bucket) throws IOException; + + /** + * Returns a list of buckets represented by {@link OmBucketInfo} + * in the given volume. + * + * @param volumeName + * the name of the volume. This argument is required, + * this method returns buckets in this given volume. + * @param startBucket + * the start bucket name. Only the buckets whose name is + * after this value will be included in the result. + * This key is excluded from the result. + * @param bucketPrefix + * bucket name prefix. Only the buckets whose name has + * this prefix will be included in the result. + * @param maxNumOfBuckets + * the maximum number of buckets to return. It ensures + * the size of the result will not exceed this limit. + * @return a list of buckets. + * @throws IOException + */ + List<OmBucketInfo> listBuckets(String volumeName, String startBucket, + String bucketPrefix, int maxNumOfBuckets) throws IOException; + + /** + * Returns a list of keys represented by {@link OmKeyInfo} + * in the given bucket. + * + * @param volumeName + * the name of the volume. + * @param bucketName + * the name of the bucket. + * @param startKey + * the start key name, only the keys whose name is + * after this value will be included in the result. + * This key is excluded from the result. + * @param keyPrefix + * key name prefix, only the keys whose name has + * this prefix will be included in the result. + * @param maxKeys + * the maximum number of keys to return. It ensures + * the size of the result will not exceed this limit. + * @return a list of keys. + * @throws IOException + */ + List<OmKeyInfo> listKeys(String volumeName, + String bucketName, String startKey, String keyPrefix, int maxKeys) + throws IOException; + + /** + * Returns a list of volumes owned by a given user; if user is null, + * returns all volumes. + * + * @param userName + * volume owner + * @param prefix + * the volume prefix used to filter the listing result. + * @param startKey + * the start volume name determines where to start listing from, + * this key is excluded from the result. + * @param maxKeys + * the maximum number of volumes to return. + * @return a list of {@link OmVolumeArgs} + * @throws IOException + */ + List<OmVolumeArgs> listVolumes(String userName, String prefix, + String startKey, int maxKeys) throws IOException; + + /** + * Returns a list of pending deletion key info that ups to the given count. + * Each entry is a {@link BlockGroup}, which contains the info about the + * key name and all its associated block IDs. A pending deletion key is + * stored with #deleting# prefix in OM DB. + * + * @param count max number of keys to return. + * @return a list of {@link BlockGroup} represent keys and blocks. + * @throws IOException + */ + List<BlockGroup> getPendingDeletionKeys(int count) throws IOException; + + /** + * Returns a list of all still open key info. Which contains the info about + * the key name and all its associated block IDs. A pending open key has + * prefix #open# in OM DB. + * + * @return a list of {@link BlockGroup} representing keys and blocks. + * @throws IOException + */ + List<BlockGroup> getExpiredOpenKeys() throws IOException; +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org