This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 549dee6  ATLAS-3325: update hive-bridge to use relationship attributes
549dee6 is described below

commit 549dee6b6a488da4e5aa75b0927855889e1eb59f
Author: Mandar Ambawane <mandar.ambaw...@freestoneinfotech.com>
AuthorDate: Sat Jul 13 14:56:03 2019 +0530

    ATLAS-3325: update hive-bridge to use relationship attributes
    
    Signed-off-by: Madhan Neethiraj <mad...@apache.org>
---
 .../atlas/hive/bridge/HiveMetaStoreBridge.java     | 26 +++++++++++-----------
 .../atlas/hive/hook/events/BaseHiveEvent.java      | 16 +++++++------
 .../atlas/hive/hook/events/CreateHiveProcess.java  |  4 ++--
 3 files changed, 24 insertions(+), 22 deletions(-)

diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 049112b..a61a3e6 100755
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -363,8 +363,8 @@ public class HiveMetaStoreBridge {
                     processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
processQualifiedName);
                     processInst.setAttribute(ATTRIBUTE_NAME, query);
                     processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, 
metadataNamespace);
-                    processInst.setAttribute(ATTRIBUTE_INPUTS, 
Collections.singletonList(BaseHiveEvent.getObjectId(pathInst)));
-                    processInst.setAttribute(ATTRIBUTE_OUTPUTS, 
Collections.singletonList(BaseHiveEvent.getObjectId(tableInst)));
+                    processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS, 
Collections.singletonList(getAtlasRelatedObjectId(pathInst, 
RELATIONSHIP_DATASET_PROCESS_INPUTS)));
+                    processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, 
Collections.singletonList(getAtlasRelatedObjectId(tableInst, 
RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
                     processInst.setAttribute(ATTRIBUTE_USER_NAME, 
table.getOwner());
                     processInst.setAttribute(ATTRIBUTE_START_TIME, now);
                     processInst.setAttribute(ATTRIBUTE_END_TIME, now);
@@ -590,7 +590,7 @@ public class HiveMetaStoreBridge {
         long        createTime         = 
BaseHiveEvent.getTableCreateTime(hiveTable);
         long        lastAccessTime     = hiveTable.getLastAccessTime() > 0 ? 
hiveTable.getLastAccessTime() : createTime;
 
-        tableEntity.setAttribute(ATTRIBUTE_DB, 
BaseHiveEvent.getObjectId(database));
+        tableEntity.setRelationshipAttribute(ATTRIBUTE_DB, 
getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB));
         tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
         tableEntity.setAttribute(ATTRIBUTE_NAME, 
hiveTable.getTableName().toLowerCase());
         tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner());
@@ -611,13 +611,13 @@ public class HiveMetaStoreBridge {
             tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, 
hiveTable.getViewExpandedText());
         }
 
-        AtlasEntity       sdEntity = toStroageDescEntity(hiveTable.getSd(), 
tableQualifiedName, getStorageDescQFName(tableQualifiedName), 
BaseHiveEvent.getObjectId(tableEntity));
-        List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), 
tableEntity);
-        List<AtlasEntity> columns  = toColumns(hiveTable.getCols(), 
tableEntity);
+        AtlasEntity       sdEntity = toStorageDescEntity(hiveTable.getSd(), 
tableQualifiedName, getStorageDescQFName(tableQualifiedName), 
BaseHiveEvent.getObjectId(tableEntity));
+        List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), 
tableEntity, RELATIONSHIP_HIVE_TABLE_PART_KEYS);
+        List<AtlasEntity> columns  = toColumns(hiveTable.getCols(), 
tableEntity, RELATIONSHIP_HIVE_TABLE_COLUMNS);
 
-        tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC, 
BaseHiveEvent.getObjectId(sdEntity));
-        tableEntity.setAttribute(ATTRIBUTE_PARTITION_KEYS, 
BaseHiveEvent.getObjectIds(partKeys));
-        tableEntity.setAttribute(ATTRIBUTE_COLUMNS, 
BaseHiveEvent.getObjectIds(columns));
+        tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, 
getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+        tableEntity.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, 
getObjectIdsWithRelationshipType(partKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS));
+        tableEntity.setRelationshipAttribute(ATTRIBUTE_COLUMNS, 
getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
 
         table.addReferredEntity(database);
         table.addReferredEntity(sdEntity);
@@ -639,10 +639,10 @@ public class HiveMetaStoreBridge {
         return table;
     }
 
-    private AtlasEntity toStroageDescEntity(StorageDescriptor storageDesc, 
String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) 
throws AtlasHookException {
+    private AtlasEntity toStorageDescEntity(StorageDescriptor storageDesc, 
String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) 
throws AtlasHookException {
         AtlasEntity ret = new 
AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName());
 
-        ret.setAttribute(ATTRIBUTE_TABLE, tableId);
+        ret.setRelationshipAttribute(ATTRIBUTE_TABLE, 
getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
         ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters());
         ret.setAttribute(ATTRIBUTE_LOCATION, 
HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
@@ -689,7 +689,7 @@ public class HiveMetaStoreBridge {
         return ret;
     }
 
-    private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, 
AtlasEntity table) throws AtlasHookException {
+    private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, 
AtlasEntity table, String relationshipType) throws AtlasHookException {
         List<AtlasEntity> ret = new ArrayList<>();
 
         int columnPosition = 0;
@@ -698,7 +698,7 @@ public class HiveMetaStoreBridge {
 
             AtlasEntity column = new 
AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName());
 
-            column.setAttribute(ATTRIBUTE_TABLE, 
BaseHiveEvent.getObjectId(table));
+            column.setRelationshipAttribute(ATTRIBUTE_TABLE, 
getAtlasRelatedObjectId(table, relationshipType));
             column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
fs.getName()));
             column.setAttribute(ATTRIBUTE_NAME, fs.getName());
             column.setAttribute(ATTRIBUTE_OWNER, 
table.getAttribute(ATTRIBUTE_OWNER));
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 98b4d4f..a74273a 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -206,19 +206,21 @@ public abstract class BaseHiveEvent {
     }
 
 
-    public static AtlasRelatedObjectId 
getObjectIdWithRelationshipType(AtlasEntity entity, String relationShipType) {
-        AtlasRelatedObjectId atlasRelatedObjectId = new 
AtlasRelatedObjectId(getObjectId(entity), relationShipType);
-        return atlasRelatedObjectId;
+    public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasEntity 
entity, String relationshipType) {
+        return getAtlasRelatedObjectId(getObjectId(entity), relationshipType);
     }
 
-
+    public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasObjectId 
objectId, String relationShipType) {
+        AtlasRelatedObjectId atlasRelatedObjectId = new 
AtlasRelatedObjectId(objectId, relationShipType);
+        return atlasRelatedObjectId;
+    }
 
     public static List<AtlasRelatedObjectId> 
getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String 
relationshipType) {
         final List<AtlasRelatedObjectId> ret;
         if (CollectionUtils.isNotEmpty(entities)) {
             ret = new ArrayList<>(entities.size());
             for (AtlasEntity entity : entities) {
-                ret.add(getObjectIdWithRelationshipType(entity, 
relationshipType));
+                ret.add(getAtlasRelatedObjectId(entity, relationshipType));
             }
         } else {
             ret = Collections.emptyList();
@@ -478,7 +480,7 @@ public abstract class BaseHiveEvent {
                     }
 
 
-                    ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, 
getObjectIdWithRelationshipType(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+                    ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, 
getAtlasRelatedObjectId(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
                     ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, 
getObjectIdsWithRelationshipType(partitionKeys, 
RELATIONSHIP_HIVE_TABLE_PART_KEYS));
                     ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS, 
getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
                 }
@@ -625,7 +627,7 @@ public abstract class BaseHiveEvent {
 
                 ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
 
-                ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, 
getObjectIdWithRelationshipType(bucketEntity, 
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
+                ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, 
getAtlasRelatedObjectId(bucketEntity, 
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
                 ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, 
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
                 ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
                 ret.setAttribute(ATTRIBUTE_NAME, 
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 91e063e..6b050d4 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -208,8 +208,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
             columnLineageProcess.setAttribute(ATTRIBUTE_NAME, 
hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + 
outputColumn.getAttribute(ATTRIBUTE_NAME));
             columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + 
outputColumn.getAttribute(ATTRIBUTE_NAME));
             columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, 
getObjectIdsWithRelationshipType(inputColumns, 
BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS));
-            columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, 
Collections.singletonList(getObjectIdWithRelationshipType(outputColumn, 
BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
-            columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, 
getObjectIdWithRelationshipType(hiveProcess, 
BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE));
+            columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, 
Collections.singletonList(getAtlasRelatedObjectId(outputColumn, 
BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
+            columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, 
getAtlasRelatedObjectId(hiveProcess, 
BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE));
             columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, 
entry.getValue().getType());
             columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, 
entry.getValue().getExpr());
 

Reply via email to