Repository: atlas Updated Branches: refs/heads/branch-1.0 bfd88c3c3 -> 674a46d0e
ATLAS-2760: Hive hook updates to handle references to s3 paths (cherry picked from commit 88ac0fa62186a59ed08a950385c5a1e3c84e517e) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/674a46d0 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/674a46d0 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/674a46d0 Branch: refs/heads/branch-1.0 Commit: 674a46d0e16b6de052544e48d9b84a3ed811e8cb Parents: bfd88c3 Author: Madhan Neethiraj <mad...@apache.org> Authored: Sun Jun 17 13:20:37 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Wed Jun 20 08:38:45 2018 -0700 ---------------------------------------------------------------------- .../atlas/hive/hook/events/BaseHiveEvent.java | 79 ++++++++++++++++---- .../atlas/hive/hook/events/CreateTable.java | 2 +- 2 files changed, 64 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/674a46d0/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index ce03287..09f011c 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -78,6 +78,13 @@ public abstract class BaseHiveEvent { public static final String HDFS_TYPE_PATH = "hdfs_path"; public static final String HBASE_TYPE_TABLE = "hbase_table"; public static final String HBASE_TYPE_NAMESPACE = "hbase_namespace"; + public static final String AWS_S3_BUCKET = "aws_s3_bucket"; + public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir"; + public static final String AWS_S3_OBJECT = "aws_s3_object"; + + public static final String SCHEME_SEPARATOR = "://"; + public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR; + public static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR; public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; public static final String ATTRIBUTE_NAME = "name"; @@ -130,6 +137,8 @@ public abstract class BaseHiveEvent { public static final String ATTRIBUTE_URI = "uri"; public static final String ATTRIBUTE_STORAGE_HANDLER = "storage_handler"; public static final String ATTRIBUTE_NAMESPACE = "namespace"; + public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix"; + public static final String ATTRIBUTE_BUCKET = "bucket"; public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler"; public static final String HBASE_DEFAULT_NAMESPACE = "default"; @@ -245,7 +254,7 @@ public abstract class BaseHiveEvent { URI location = entity.getLocation(); if (location != null) { - ret = getHDFSPathEntity(new Path(entity.getLocation())); + ret = getPathEntity(new Path(entity.getLocation()), entityExtInfo); } } break; @@ -494,26 +503,60 @@ public abstract class BaseHiveEvent { return ret; } - protected AtlasEntity getHDFSPathEntity(Path path) { - String strPath = path.toString().toLowerCase(); - String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath); - String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath); - String pathQualifiedName = getQualifiedName(attrPath); - AtlasEntity ret = context.getEntity(pathQualifiedName); + protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) { + AtlasEntity ret; + String strPath = path.toString().toLowerCase(); - if (ret == null) { - ret = new AtlasEntity(HDFS_TYPE_PATH); + if (isS3Path(strPath)) { + String bucketName = path.toUri().getAuthority(); + String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + String pathQualifiedName = (strPath + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); + + ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + if (bucketEntity == null) { + bucketEntity = new AtlasEntity(AWS_S3_BUCKET); + + bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName); + bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName); + + context.putEntity(bucketQualifiedName, bucketEntity); + } + + extInfo.addReferredEntity(bucketEntity); + + ret = new AtlasEntity(AWS_S3_PSEUDO_DIR); + + ret.setAttribute(ATTRIBUTE_BUCKET, getObjectId(bucketEntity)); + ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); - if (StringUtils.isNotEmpty(nameServiceID)) { - ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID); + context.putEntity(pathQualifiedName, ret); } + } else { + String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath); + String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath); + String pathQualifiedName = getQualifiedName(attrPath); - ret.setAttribute(ATTRIBUTE_PATH, attrPath); - ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); - ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); - ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); + ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + ret = new AtlasEntity(HDFS_TYPE_PATH); + + if (StringUtils.isNotEmpty(nameServiceID)) { + ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID); + } - context.putEntity(pathQualifiedName, ret); + ret.setAttribute(ATTRIBUTE_PATH, attrPath); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); + ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); + + context.putEntity(pathQualifiedName, ret); + } } return ret; @@ -874,6 +917,10 @@ public abstract class BaseHiveEvent { return false; } + private boolean isS3Path(String strPath) { + return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME)); + } + static final class EntityComparator implements Comparator<Entity> { @Override http://git-wip-us.apache.org/repos/asf/atlas/blob/674a46d0/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java index daf5c86..442a0a0 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java @@ -98,7 +98,7 @@ public class CreateTable extends BaseHiveEvent { } } else { if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { - AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation()); + AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret); AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity)); ret.addEntity(processEntity);