Repository: incubator-atlas
Updated Branches:
  refs/heads/master e0536224a -> 8fefd1655


ATLAS-884 Process registration should call Entity update instead of create 
(sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/8fefd165
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/8fefd165
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/8fefd165

Branch: refs/heads/master
Commit: 8fefd165586f5fd8acd5c3a5786d6012dda68a5b
Parents: e053622
Author: Suma Shivaprasad <[email protected]>
Authored: Tue Jun 14 16:33:05 2016 -0700
Committer: Suma Shivaprasad <[email protected]>
Committed: Tue Jun 14 16:33:05 2016 -0700

----------------------------------------------------------------------
 .../org/apache/atlas/hive/hook/HiveHook.java    |  71 ++++++----
 .../org/apache/atlas/hive/hook/HiveHookIT.java  | 135 ++++++++++++++-----
 release-log.txt                                 |   1 +
 3 files changed, 148 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fefd165/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index d7fa068..0ccb18b 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -62,6 +62,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -203,9 +204,9 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             break;
 
         case CREATETABLE:
-            List<Pair<? extends Entity, Referenceable>> tablesCreated = 
handleEventOutputs(dgiBridge, event, Type.TABLE);
+            LinkedHashMap<Type, Referenceable> tablesCreated = 
handleEventOutputs(dgiBridge, event, Type.TABLE);
             if (tablesCreated.size() > 0) {
-                handleExternalTables(dgiBridge, event, 
tablesCreated.get(0).getLeft(), tablesCreated.get(0).getRight());
+                handleExternalTables(dgiBridge, event, tablesCreated);
             }
             break;
 
@@ -242,10 +243,10 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             renameColumn(dgiBridge, event);
             break;
         case ALTERTABLE_LOCATION:
-            List<Pair<? extends Entity, Referenceable>> tablesUpdated = 
handleEventOutputs(dgiBridge, event, Type.TABLE);
+            LinkedHashMap<Type, Referenceable> tablesUpdated = 
handleEventOutputs(dgiBridge, event, Type.TABLE);
             if (tablesUpdated != null && tablesUpdated.size() > 0) {
                 //Track altered lineage in case of external tables
-                handleExternalTables(dgiBridge, event, 
tablesUpdated.get(0).getLeft(), tablesUpdated.get(0).getRight());
+                handleExternalTables(dgiBridge, event, tablesUpdated);
             }
             break;
         case ALTERDATABASE:
@@ -384,7 +385,8 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
 
                     //Create/update old table entity - create entity with 
oldQFNme and old tableName if it doesnt exist. If exists, will update
                     //We always use the new entity while creating the table 
since some flags, attributes of the table are not set in inputEntity and 
Hive.getTable(oldTableName) also fails since the table doesnt exist in hive 
anymore
-                    final Referenceable tableEntity = 
createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity, true);
+                    final LinkedHashMap<Type, Referenceable> tables = 
createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity, true);
+                    Referenceable tableEntity = tables.get(Type.TABLE);
 
                     //Reset regular column QF Name to old Name and create a 
new partial notification request to replace old column QFName to newName to 
retain any existing traits
                     replaceColumnQFName(event, (List<Referenceable>) 
tableEntity.get(HiveDataModelGenerator.COLUMNS), oldQualifiedName, 
newQualifiedName);
@@ -458,10 +460,11 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         return newSDEntity;
     }
 
-    private Referenceable createOrUpdateEntities(HiveMetaStoreBridge 
dgiBridge, String user, Entity entity, boolean skipTempTables, Table 
existTable) throws Exception {
+    private LinkedHashMap<Type, Referenceable> 
createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity 
entity, boolean skipTempTables, Table existTable) throws Exception {
         Database db = null;
         Table table = null;
         Partition partition = null;
+        LinkedHashMap<Type, Referenceable> result = new LinkedHashMap<>();
         List<Referenceable> entities = new ArrayList<>();
 
         switch (entity.getType()) {
@@ -483,7 +486,9 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
 
         db = dgiBridge.hiveClient.getDatabase(db.getName());
         Referenceable dbEntity = dgiBridge.createDBInstance(db);
+
         entities.add(dbEntity);
+        result.put(Type.DATABASE, dbEntity);
 
         Referenceable tableEntity = null;
 
@@ -503,30 +508,38 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             } else {
                 tableEntity = dgiBridge.createTableInstance(dbEntity, table);
                 entities.add(tableEntity);
+                result.put(Type.TABLE, tableEntity);
             }
         }
 
+
         messages.add(new HookNotification.EntityUpdateRequest(user, entities));
-        return tableEntity;
+        return result;
     }
 
-    private Referenceable createOrUpdateEntities(HiveMetaStoreBridge 
dgiBridge, String user, Entity entity, boolean skipTempTables) throws Exception{
+    private LinkedHashMap<Type, Referenceable> 
createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity 
entity, boolean skipTempTables) throws Exception{
         return createOrUpdateEntities(dgiBridge, user, entity, skipTempTables, 
null);
     }
 
-    private List<Pair<? extends Entity, Referenceable>> 
handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type 
entityType) throws Exception {
-        List<Pair<? extends Entity, Referenceable>> entitiesCreatedOrUpdated = 
new ArrayList<>();
+    private LinkedHashMap<Type, Referenceable> 
handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type 
entityType) throws Exception {
         for (Entity entity : event.getOutputs()) {
             if (entity.getType() == entityType) {
-                Referenceable entityCreatedOrUpdated = 
createOrUpdateEntities(dgiBridge, event.getUser(), entity, true);
-                if (entitiesCreatedOrUpdated != null) {
-                    entitiesCreatedOrUpdated.add(Pair.of(entity, 
entityCreatedOrUpdated));
-                }
+                return createOrUpdateEntities(dgiBridge, event.getUser(), 
entity, true);
+            }
+        }
+        return null;
+    }
+
+    private Entity getEntityByType(Set<? extends Entity> entities, Type 
entityType) {
+        for (Entity entity : entities) {
+            if (entity.getType() == entityType) {
+                return entity;
             }
         }
-        return entitiesCreatedOrUpdated;
+        return null;
     }
 
+
     public static String lower(String str) {
         if (StringUtils.isEmpty(str)) {
             return null;
@@ -565,17 +578,18 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
 
         final Map<String, Referenceable> source = new LinkedHashMap<>();
         final Map<String, Referenceable> target = new LinkedHashMap<>();
+        final Set<Referenceable> entities = new LinkedHashSet<>();
 
         boolean isSelectQuery = isSelectQuery(event);
 
         // filter out select queries which do not modify data
         if (!isSelectQuery) {
             for (ReadEntity readEntity : event.getInputs()) {
-                processHiveEntity(dgiBridge, event, readEntity, source);
+                processHiveEntity(dgiBridge, event, readEntity, source, 
entities);
             }
 
             for (WriteEntity writeEntity : event.getOutputs()) {
-                processHiveEntity(dgiBridge, event, writeEntity, target);
+                processHiveEntity(dgiBridge, event, writeEntity, target, 
entities);
             }
 
             if (source.size() > 0 || target.size() > 0) {
@@ -586,7 +600,9 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
                     new ArrayList<Referenceable>() {{
                         addAll(target.values());
                     }});
-                messages.add(new 
HookNotification.EntityCreateRequest(event.getUser(), processReferenceable));
+
+                entities.add(processReferenceable);
+                messages.add(new 
HookNotification.EntityUpdateRequest(event.getUser(), new 
ArrayList<Referenceable>(entities)));
             } else {
                 LOG.info("Skipped query {} since it has no getInputs() or 
resulting getOutputs()", event.getQueryStr());
             }
@@ -595,18 +611,20 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         }
     }
 
-    private void processHiveEntity(HiveMetaStoreBridge dgiBridge, 
HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets) 
throws Exception {
+    private void processHiveEntity(HiveMetaStoreBridge dgiBridge, 
HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets, 
Set<Referenceable> entities) throws Exception {
         if (entity.getType() == Type.TABLE || entity.getType() == 
Type.PARTITION) {
             final String tblQFName = 
dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
             if (!dataSets.containsKey(tblQFName)) {
-                Referenceable inTable = createOrUpdateEntities(dgiBridge, 
event.getUser(), entity, false);
-                dataSets.put(tblQFName, inTable);
+                LinkedHashMap<Type, Referenceable> result = 
createOrUpdateEntities(dgiBridge, event.getUser(), entity, false);
+                dataSets.put(tblQFName, result.get(Type.TABLE));
+                entities.addAll(result.values());
             }
         } else if (entity.getType() == Type.DFS_DIR) {
             final String pathUri = lower(new 
Path(entity.getLocation()).toString());
             LOG.info("Registering DFS Path {} ", pathUri);
             Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
             dataSets.put(pathUri, hdfsPath);
+            entities.add(hdfsPath);
         }
     }
 
@@ -642,8 +660,9 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         return false;
     }
 
-    private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, 
final HiveEventContext event, final Entity entity, final Referenceable tblRef) 
throws HiveException, MalformedURLException {
-        Table hiveTable = entity.getTable();
+    private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, 
final HiveEventContext event, final LinkedHashMap<Type, Referenceable> tables) 
throws HiveException, MalformedURLException {
+        List<Referenceable> entities = new ArrayList<>();
+        Table hiveTable = getEntityByType(event.getOutputs(), 
Type.TABLE).getTable();
         //Refresh to get the correct location
         hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), 
hiveTable.getTableName());
 
@@ -655,11 +674,13 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             }};
 
             List<Referenceable> outputs = new ArrayList<Referenceable>() {{
-                add(tblRef);
+                add(tables.get(Type.TABLE));
             }};
 
             Referenceable processReferenceable = 
getProcessReferenceable(dgiBridge, event, inputs, outputs);
-            messages.add(new 
HookNotification.EntityCreateRequest(event.getUser(), processReferenceable));
+            entities.addAll(tables.values());
+            entities.add(processReferenceable);
+            messages.add(new 
HookNotification.EntityUpdateRequest(event.getUser(), entities));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fefd165/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index c3f0bb6..00c17e8 100755
--- 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -248,7 +248,7 @@ public class HiveHookIT {
         verifyTimestamps(processReference, "startTime");
         verifyTimestamps(processReference, "endTime");
 
-        validateHDFSPaths(processReference, pFile, INPUTS);
+        validateHDFSPaths(processReference, INPUTS, pFile);
     }
 
     private void validateOutputTables(Referenceable processReference, 
String... expectedTableNames) throws Exception {
@@ -301,6 +301,35 @@ public class HiveHookIT {
     }
 
     @Test
+    public void testDropAndRecreateCTASOutput() throws Exception {
+        String tableName = createTable();
+        String ctasTableName = "table" + random();
+        String query = "create table " + ctasTableName + " as select * from " 
+ tableName;
+        runCommand(query);
+
+        assertTableIsRegistered(DEFAULT_DB, ctasTableName);
+        String processId = assertProcessIsRegistered(query);
+
+        final String drpquery = String.format("drop table %s ", ctasTableName);
+        runCommand(drpquery);
+        assertTableIsNotRegistered(DEFAULT_DB, ctasTableName);
+
+        //Fix after ATLAS-876
+        runCommand(query);
+        assertTableIsRegistered(DEFAULT_DB, ctasTableName);
+        String process2Id = assertProcessIsRegistered(query);
+
+        Assert.assertEquals(process2Id, processId);
+
+        Referenceable processRef = atlasClient.getEntity(processId);
+        String tblQlfdname = getQualifiedTblName(tableName);
+        String ctasQlfdname = getQualifiedTblName(ctasTableName);
+
+        validateInputTables(processRef, tblQlfdname);
+        validateOutputTables(processRef, ctasQlfdname, ctasQlfdname);
+    }
+
+    @Test
     public void testCreateView() throws Exception {
         String tableName = createTable();
         String viewName = tableName();
@@ -402,7 +431,7 @@ public class HiveHookIT {
         final String tblQlfdName = getQualifiedTblName(tableName);
         Referenceable processReference = validateProcess(query, 
testPathNormed, tblQlfdName);
 
-        validateHDFSPaths(processReference, loadFile, INPUTS);
+        validateHDFSPaths(processReference, INPUTS, loadFile);
 
         validateOutputTables(processReference, tblQlfdName);
     }
@@ -416,8 +445,8 @@ public class HiveHookIT {
         return inputtblQlfdName;
     }
 
-    private Referenceable validateProcess(String query, String inputTable, 
String outputTable) throws Exception {
-        String processId = assertProcessIsRegistered(query, inputTable, 
outputTable);
+    private Referenceable validateProcess(String query, String inputTable, 
String... outputTables) throws Exception {
+        String processId = assertProcessIsRegistered(query, inputTable, 
outputTables);
         Referenceable process = atlasClient.getEntity(processId);
         if (inputTable == null) {
             Assert.assertNull(process.get(INPUTS));
@@ -426,11 +455,11 @@ public class HiveHookIT {
             validateInputTables(process, inputTable);
         }
 
-        if (outputTable == null) {
+        if (outputTables == null) {
             Assert.assertNull(process.get(OUTPUTS));
         } else {
-            Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1 );
-            validateOutputTables(process, outputTable);
+            Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1);
+            validateOutputTables(process, outputTables);
         }
 
         return process;
@@ -472,6 +501,43 @@ public class HiveHookIT {
     }
 
     @Test
+    public void testUpdateProcess() throws Exception {
+        String tableName = createTable();
+        String pFile1 = createTestDFSPath("somedfspath1");
+        String testPathNormed = lower(new Path(pFile1).toString());
+        String query =
+            "insert overwrite DIRECTORY '" + pFile1  + "' select id, name from 
" + tableName;
+
+        runCommand(query);
+        String tblQlfdname = getQualifiedTblName(tableName);
+        Referenceable processReference = validateProcess(query, tblQlfdname, 
testPathNormed);
+        validateHDFSPaths(processReference, OUTPUTS, pFile1);
+
+        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
+        validateInputTables(processReference, tblQlfdname);
+
+        //Rerun same query with same HDFS path
+
+        runCommand(query);
+        Referenceable process2Reference = validateProcess(query, tblQlfdname, 
testPathNormed);
+        validateHDFSPaths(process2Reference, OUTPUTS, pFile1);
+
+        Assert.assertEquals(process2Reference.getId()._getId(), 
processReference.getId()._getId());
+
+        //Rerun same query with a new HDFS path. Should create a new process
+        String pFile2 = createTestDFSPath("somedfspath2");
+        query = "insert overwrite DIRECTORY '" + pFile2  + "' select id, name 
from " + tableName;
+        final String testPathNormed2 = lower(new Path(pFile2).toString());
+        runCommand(query);
+
+        Referenceable process3Reference = validateProcess(query, tblQlfdname, 
testPathNormed2);
+        validateHDFSPaths(process3Reference, OUTPUTS, pFile2);
+
+        Assert.assertNotEquals(process3Reference.getId()._getId(), 
processReference.getId()._getId());
+    }
+
+    @Test
     public void testInsertIntoDFSDir() throws Exception {
         String tableName = createTable();
         String pFile1 = createTestDFSPath("somedfspath1");
@@ -482,7 +548,7 @@ public class HiveHookIT {
         runCommand(query);
         String tblQlfdname = getQualifiedTblName(tableName);
         Referenceable processReference = validateProcess(query, tblQlfdname, 
testPathNormed);
-        validateHDFSPaths(processReference, pFile1, OUTPUTS);
+        validateHDFSPaths(processReference, OUTPUTS, pFile1);
 
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
 
@@ -498,7 +564,7 @@ public class HiveHookIT {
         runCommand(query);
         tblQlfdname = getQualifiedTblName(tableName);
         Referenceable process2Reference = validateProcess(query, tblQlfdname, 
testPathNormed);
-        validateHDFSPaths(process2Reference, pFile2, OUTPUTS);
+        validateHDFSPaths(process2Reference, OUTPUTS, pFile2);
 
         Assert.assertNotEquals(process2Reference.getId()._getId(), 
processReference.getId()._getId());
     }
@@ -564,7 +630,7 @@ public class HiveHookIT {
         runCommand(query);
         String tblQlfName = getQualifiedTblName(tableName);
         Referenceable processReference = validateProcess(query, tblQlfName, 
testPathNormed);
-        validateHDFSPaths(processReference, filename, OUTPUTS);
+        validateHDFSPaths(processReference, OUTPUTS, filename);
         validateInputTables(processReference, tblQlfName);
 
         //Import
@@ -575,7 +641,7 @@ public class HiveHookIT {
         runCommand(query);
         tblQlfName = getQualifiedTblName(tableName);
         processReference = validateProcess(query, testPathNormed, tblQlfName);
-        validateHDFSPaths(processReference, filename, INPUTS);
+        validateHDFSPaths(processReference, INPUTS, filename);
 
         validateOutputTables(processReference, tblQlfName);
     }
@@ -596,7 +662,7 @@ public class HiveHookIT {
         runCommand(query);
         String tblQlfdName = getQualifiedTblName(tableName);
         Referenceable processReference = validateProcess(query, tblQlfdName, 
testPathNormed);
-        validateHDFSPaths(processReference, filename, OUTPUTS);
+        validateHDFSPaths(processReference, OUTPUTS, filename);
 
         validateInputTables(processReference, tblQlfdName);
 
@@ -608,7 +674,7 @@ public class HiveHookIT {
         runCommand(query);
         tblQlfdName = getQualifiedTblName(tableName);
         processReference = validateProcess(query, testPathNormed, tblQlfdName);
-        validateHDFSPaths(processReference, filename, INPUTS);
+        validateHDFSPaths(processReference, INPUTS, filename);
 
         validateOutputTables(processReference, tblQlfdName);
     }
@@ -997,22 +1063,22 @@ public class HiveHookIT {
 
         final String testPathNormed = lower(new Path(testPath).toString());
         Referenceable processReference = validateProcess(query, 
testPathNormed, tblQlfdName);
-        validateHDFSPaths(processReference, testPath, INPUTS);
+        validateHDFSPaths(processReference, INPUTS, testPath);
     }
 
-    private String validateHDFSPaths(Referenceable processReference, String 
testPath, String attributeName) throws Exception {
+    private void validateHDFSPaths(Referenceable processReference, String 
attributeName, String... testPaths) throws Exception {
         List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
 
-        final String testPathNormed = lower(new Path(testPath).toString());
-        String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
-        Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
-
-        Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
-        Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
-        Assert.assertEquals(hdfsPathRef.get(NAME), new 
Path(testPathNormed).getName());
-        
Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), 
testPathNormed);
+        for (int i = 0; i < testPaths.length; i++) {
+            final String testPathNormed = lower(new 
Path(testPaths[i]).toString());
+            String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
+            Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
 
-        return hdfsPathRef.getId()._getId();
+            Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
+            Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
+            Assert.assertEquals(hdfsPathRef.get(NAME), new 
Path(testPathNormed).getName());
+            
Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), 
testPathNormed);
+        }
     }
 
     private String assertHDFSPathIsRegistered(String path) throws Exception {
@@ -1393,13 +1459,12 @@ public class HiveHookIT {
         }
     }
 
-    private String assertProcessIsRegistered(final String queryStr, final 
String inputTblName, final String outputTblName) throws Exception {
+    private String assertProcessIsRegistered(final String queryStr, final 
String inputTblName, final String... outputTblNames) throws Exception {
 
         HiveASTRewriter astRewriter = new HiveASTRewriter(conf);
         String normalizedQuery = normalize(astRewriter.rewrite(queryStr));
 
         List<Referenceable> inputs = null;
-
         if (inputTblName != null) {
             Referenceable inputTableRef = new 
Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
                 put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, inputTblName);
@@ -1407,14 +1472,16 @@ public class HiveHookIT {
             inputs = new ArrayList<Referenceable>();
             inputs.add(inputTableRef);
         }
-        List<Referenceable> outputs = null;
-        if (outputTblName != null) {
-            Referenceable outputTableRef = new 
Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
-                put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, outputTblName);
-            }});
-
-            outputs = new ArrayList<Referenceable>();
-            outputs.add(outputTableRef);
+        List<Referenceable> outputs = new ArrayList<Referenceable>();
+        if (outputTblNames != null) {
+            for(int i = 0; i < outputTblNames.length; i++) {
+                final String outputTblName = outputTblNames[i];
+                Referenceable outputTableRef = new 
Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
+                    put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
outputTblName);
+                }});
+
+                outputs.add(outputTableRef);
+            }
         }
         String processQFName = 
HiveHook.getProcessQualifiedName(normalizedQuery, inputs, outputs);
         LOG.debug("Searching for process with query {}", processQFName);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fefd165/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 76c5335..f168af6 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -23,6 +23,7 @@ ATLAS-409 Atlas will not import avro tables with schema read 
from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons 
(venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-884 Process registration should call Entity update instead of create 
(sumasai)
 ATLAS-515 Ability to initialize Kafka topics with more than 1 replica 
(yhemanth)
 ATLAS-891 UI changes to implement Update term (Kalyanikashikar via yhemanth)
 ATLAS-794 Business Catalog Update (jspeidel via yhemanth)

Reply via email to