This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 549dee6 ATLAS-3325: update hive-bridge to use relationship attributes 549dee6 is described below commit 549dee6b6a488da4e5aa75b0927855889e1eb59f Author: Mandar Ambawane <mandar.ambaw...@freestoneinfotech.com> AuthorDate: Sat Jul 13 14:56:03 2019 +0530 ATLAS-3325: update hive-bridge to use relationship attributes Signed-off-by: Madhan Neethiraj <mad...@apache.org> --- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 26 +++++++++++----------- .../atlas/hive/hook/events/BaseHiveEvent.java | 16 +++++++------ .../atlas/hive/hook/events/CreateHiveProcess.java | 4 ++-- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 049112b..a61a3e6 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -363,8 +363,8 @@ public class HiveMetaStoreBridge { processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName); processInst.setAttribute(ATTRIBUTE_NAME, query); processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); - processInst.setAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(pathInst))); - processInst.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(tableInst))); + processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(getAtlasRelatedObjectId(pathInst, RELATIONSHIP_DATASET_PROCESS_INPUTS))); + processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getAtlasRelatedObjectId(tableInst, RELATIONSHIP_PROCESS_DATASET_OUTPUTS))); processInst.setAttribute(ATTRIBUTE_USER_NAME, table.getOwner()); processInst.setAttribute(ATTRIBUTE_START_TIME, now); processInst.setAttribute(ATTRIBUTE_END_TIME, now); @@ -590,7 +590,7 @@ public class HiveMetaStoreBridge { long createTime = BaseHiveEvent.getTableCreateTime(hiveTable); long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime; - tableEntity.setAttribute(ATTRIBUTE_DB, BaseHiveEvent.getObjectId(database)); + tableEntity.setRelationshipAttribute(ATTRIBUTE_DB, getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB)); tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName); tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase()); tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner()); @@ -611,13 +611,13 @@ public class HiveMetaStoreBridge { tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText()); } - AtlasEntity sdEntity = toStroageDescEntity(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), BaseHiveEvent.getObjectId(tableEntity)); - List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), tableEntity); - List<AtlasEntity> columns = toColumns(hiveTable.getCols(), tableEntity); + AtlasEntity sdEntity = toStorageDescEntity(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), BaseHiveEvent.getObjectId(tableEntity)); + List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), tableEntity, RELATIONSHIP_HIVE_TABLE_PART_KEYS); + List<AtlasEntity> columns = toColumns(hiveTable.getCols(), tableEntity, RELATIONSHIP_HIVE_TABLE_COLUMNS); - tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC, BaseHiveEvent.getObjectId(sdEntity)); - tableEntity.setAttribute(ATTRIBUTE_PARTITION_KEYS, BaseHiveEvent.getObjectIds(partKeys)); - tableEntity.setAttribute(ATTRIBUTE_COLUMNS, BaseHiveEvent.getObjectIds(columns)); + tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); + tableEntity.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS)); + tableEntity.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS)); table.addReferredEntity(database); table.addReferredEntity(sdEntity); @@ -639,10 +639,10 @@ public class HiveMetaStoreBridge { return table; } - private AtlasEntity toStroageDescEntity(StorageDescriptor storageDesc, String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) throws AtlasHookException { + private AtlasEntity toStorageDescEntity(StorageDescriptor storageDesc, String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) throws AtlasHookException { AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName()); - ret.setAttribute(ATTRIBUTE_TABLE, tableId); + ret.setRelationshipAttribute(ATTRIBUTE_TABLE, getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName); ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters()); ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation())); @@ -689,7 +689,7 @@ public class HiveMetaStoreBridge { return ret; } - private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table) throws AtlasHookException { + private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table, String relationshipType) throws AtlasHookException { List<AtlasEntity> ret = new ArrayList<>(); int columnPosition = 0; @@ -698,7 +698,7 @@ public class HiveMetaStoreBridge { AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName()); - column.setAttribute(ATTRIBUTE_TABLE, BaseHiveEvent.getObjectId(table)); + column.setRelationshipAttribute(ATTRIBUTE_TABLE, getAtlasRelatedObjectId(table, relationshipType)); column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName())); column.setAttribute(ATTRIBUTE_NAME, fs.getName()); column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER)); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index 98b4d4f..a74273a 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -206,19 +206,21 @@ public abstract class BaseHiveEvent { } - public static AtlasRelatedObjectId getObjectIdWithRelationshipType(AtlasEntity entity, String relationShipType) { - AtlasRelatedObjectId atlasRelatedObjectId = new AtlasRelatedObjectId(getObjectId(entity), relationShipType); - return atlasRelatedObjectId; + public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasEntity entity, String relationshipType) { + return getAtlasRelatedObjectId(getObjectId(entity), relationshipType); } - + public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasObjectId objectId, String relationShipType) { + AtlasRelatedObjectId atlasRelatedObjectId = new AtlasRelatedObjectId(objectId, relationShipType); + return atlasRelatedObjectId; + } public static List<AtlasRelatedObjectId> getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String relationshipType) { final List<AtlasRelatedObjectId> ret; if (CollectionUtils.isNotEmpty(entities)) { ret = new ArrayList<>(entities.size()); for (AtlasEntity entity : entities) { - ret.add(getObjectIdWithRelationshipType(entity, relationshipType)); + ret.add(getAtlasRelatedObjectId(entity, relationshipType)); } } else { ret = Collections.emptyList(); @@ -478,7 +480,7 @@ public abstract class BaseHiveEvent { } - ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getObjectIdWithRelationshipType(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); + ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getAtlasRelatedObjectId(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partitionKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS)); ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS)); } @@ -625,7 +627,7 @@ public abstract class BaseHiveEvent { ret = new AtlasEntity(AWS_S3_PSEUDO_DIR); - ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, getObjectIdWithRelationshipType(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS)); + ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS)); ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index 91e063e..6b050d4 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -208,8 +208,8 @@ public class CreateHiveProcess extends BaseHiveEvent { columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputColumns, BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS)); - columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectIdWithRelationshipType(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS))); - columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, getObjectIdWithRelationshipType(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE)); + columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getAtlasRelatedObjectId(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS))); + columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, getAtlasRelatedObjectId(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE)); columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType()); columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr());