Repository: incubator-atlas
Updated Branches:
  refs/heads/master f623bddf8 -> f51c88615


ATLAS-917 Add hdfs paths to process qualified name for non-partition based 
queries(sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f51c8861
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f51c8861
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f51c8861

Branch: refs/heads/master
Commit: f51c886158c9c0f7dc115f0c6f0aa0e08772e0b9
Parents: f623bdd
Author: Suma Shivaprasad <[email protected]>
Authored: Fri Jul 1 12:09:32 2016 -0700
Committer: Suma Shivaprasad <[email protected]>
Committed: Fri Jul 1 12:09:32 2016 -0700

----------------------------------------------------------------------
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  |   4 +-
 .../org/apache/atlas/hive/hook/HiveHook.java    | 182 +++++--
 .../org/apache/atlas/hive/hook/HiveHookIT.java  | 483 +++++++++++++------
 .../java/org/apache/atlas/hook/AtlasHook.java   |   2 +-
 release-log.txt                                 |   1 +
 5 files changed, 473 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 0045780..cd0e964 100755
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -426,8 +426,8 @@ public class HiveMetaStoreBridge {
                 createDate = new Date(hiveTable.getTTable().getCreateTime() * 
MILLIS_CONVERT_FACTOR);
                 LOG.debug("Setting create time to {} ", createDate);
                 tableReference.set(HiveDataModelGenerator.CREATE_TIME, 
createDate);
-            } catch(NumberFormatException ne) {
-                LOG.error("Error while updating createTime for the table {} ", 
hiveTable.getCompleteName(), ne);
+            } catch(Exception ne) {
+                LOG.error("Error while setting createTime for the table {} ", 
hiveTable.getCompleteName(), ne);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index a1a00b3..99009ba 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -21,6 +21,7 @@ package org.apache.atlas.hive.hook;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import kafka.security.auth.Write;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
@@ -66,7 +67,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -86,8 +89,8 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
 
     public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-    private static final String SEP = ":".intern();
-    private static final String IO_SEP = "->".intern();
+    static final String SEP = ":".intern();
+    static final String IO_SEP = "->".intern();
 
     private static final Map<String, HiveOperation> OPERATION_MAP = new 
HashMap<>();
 
@@ -291,6 +294,8 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     private void deleteDatabase(HiveMetaStoreBridge dgiBridge, 
HiveEventContext event) {
         if (event.getOutputs().size() > 1) {
             LOG.info("Starting deletion of tables and databases with cascade 
{} ", event.getQueryStr());
+        } else {
+            LOG.info("Starting deletion of database {} ", event.getQueryStr());
         }
 
         for (WriteEntity output : event.getOutputs()) {
@@ -549,10 +554,6 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         return str.toLowerCase().trim();
     }
 
-    public static String normalize(String queryStr) {
-        return lower(queryStr);
-    }
-
     private void registerProcess(HiveMetaStoreBridge dgiBridge, 
HiveEventContext event) throws Exception {
         Set<ReadEntity> inputs = event.getInputs();
         Set<WriteEntity> outputs = event.getOutputs();
@@ -567,8 +568,8 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             LOG.info("Query id/plan is missing for {}", event.getQueryStr());
         }
 
-        final SortedMap<Entity, Referenceable> source = new 
TreeMap<>(entityComparator);
-        final SortedMap<Entity, Referenceable> target = new 
TreeMap<>(entityComparator);
+        final SortedMap<ReadEntity, Referenceable> source = new 
TreeMap<>(entityComparator);
+        final SortedMap<WriteEntity, Referenceable> target = new 
TreeMap<>(entityComparator);
 
         final Set<String> dataSets = new HashSet<>();
         final Set<Referenceable> entities = new LinkedHashSet<>();
@@ -577,16 +578,27 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
 
         // filter out select queries which do not modify data
         if (!isSelectQuery) {
-            for (ReadEntity readEntity : event.getInputs()) {
+
+            SortedSet<ReadEntity> sortedHiveInputs = new 
TreeSet<>(entityComparator);;
+            if ( event.getInputs() != null) {
+                sortedHiveInputs.addAll(event.getInputs());
+            }
+
+            SortedSet<WriteEntity> sortedHiveOutputs = new 
TreeSet<>(entityComparator);
+            if ( event.getOutputs() != null) {
+                sortedHiveOutputs.addAll(event.getOutputs());
+            }
+
+            for (ReadEntity readEntity : sortedHiveInputs) {
                 processHiveEntity(dgiBridge, event, readEntity, dataSets, 
source, entities);
             }
 
-            for (WriteEntity writeEntity : event.getOutputs()) {
+            for (WriteEntity writeEntity : sortedHiveOutputs) {
                 processHiveEntity(dgiBridge, event, writeEntity, dataSets, 
target, entities);
             }
 
             if (source.size() > 0 || target.size() > 0) {
-                Referenceable processReferenceable = 
getProcessReferenceable(dgiBridge, event, source, target);
+                Referenceable processReferenceable = 
getProcessReferenceable(dgiBridge, event, sortedHiveInputs, sortedHiveOutputs, 
source, target);
                 entities.add(processReferenceable);
                 event.addMessage(new 
HookNotification.EntityUpdateRequest(event.getUser(), new 
ArrayList<>(entities)));
             } else {
@@ -597,8 +609,8 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         }
     }
 
-    private void processHiveEntity(HiveMetaStoreBridge dgiBridge, 
HiveEventContext event, Entity entity, Set<String> dataSetsProcessed,
-        SortedMap<Entity, Referenceable> dataSets, Set<Referenceable> 
entities) throws Exception {
+    private  <T extends Entity> void processHiveEntity(HiveMetaStoreBridge 
dgiBridge, HiveEventContext event, T entity, Set<String> dataSetsProcessed,
+        SortedMap<T, Referenceable> dataSets, Set<Referenceable> entities) 
throws Exception {
         if (entity.getType() == Type.TABLE || entity.getType() == 
Type.PARTITION) {
             final String tblQFName = 
dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
             if (!dataSetsProcessed.contains(tblQFName)) {
@@ -609,7 +621,7 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             }
         } else if (entity.getType() == Type.DFS_DIR) {
             final String pathUri = lower(new 
Path(entity.getLocation()).toString());
-            LOG.info("Registering DFS Path {} ", pathUri);
+            LOG.debug("Registering DFS Path {} ", pathUri);
             if (!dataSetsProcessed.contains(pathUri)) {
                 Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
                 dataSets.put(entity, hdfsPath);
@@ -653,7 +665,7 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
 
     private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, 
final HiveEventContext event, final LinkedHashMap<Type, Referenceable> tables) 
throws HiveException, MalformedURLException {
         List<Referenceable> entities = new ArrayList<>();
-        final Entity hiveEntity = getEntityByType(event.getOutputs(), 
Type.TABLE);
+        final WriteEntity hiveEntity = (WriteEntity) 
getEntityByType(event.getOutputs(), Type.TABLE);
         Table hiveTable = hiveEntity.getTable();
         //Refresh to get the correct location
         hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), 
hiveTable.getTableName());
@@ -665,18 +677,25 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             dfsEntity.setTyp(Type.DFS_DIR);
             dfsEntity.setName(location);
 
-            SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, 
Referenceable>(entityComparator) {{
+            SortedMap<ReadEntity, Referenceable> hiveInputsMap = new 
TreeMap<ReadEntity, Referenceable>(entityComparator) {{
                 put(dfsEntity, dgiBridge.fillHDFSDataSet(location));
             }};
 
-            SortedMap<Entity, Referenceable> outputs = new TreeMap<Entity, 
Referenceable>(entityComparator) {{
+            SortedMap<WriteEntity, Referenceable> hiveOutputsMap = new 
TreeMap<WriteEntity, Referenceable>(entityComparator) {{
                 put(hiveEntity, tables.get(Type.TABLE));
             }};
 
-            Referenceable processReferenceable = 
getProcessReferenceable(dgiBridge, event, inputs, outputs);
+            SortedSet<ReadEntity> sortedIps = new TreeSet<>(entityComparator);
+            sortedIps.addAll(hiveInputsMap.keySet());
+            SortedSet<WriteEntity> sortedOps = new TreeSet<>(entityComparator);
+            sortedOps.addAll(hiveOutputsMap.keySet());
+
+            Referenceable processReferenceable = 
getProcessReferenceable(dgiBridge, event,
+                sortedIps, sortedOps, hiveInputsMap, hiveOutputsMap);
             String tableQualifiedName = 
dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable);
 
             if (isCreateOp(event)){
+                LOG.info("Overriding process qualified name to {}", 
tableQualifiedName);
                 
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
tableQualifiedName);
             }
             entities.addAll(tables.values());
@@ -689,6 +708,7 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         if (HiveOperation.CREATETABLE.equals(hiveEvent.getOperation())
             || HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation())
             || HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation())
+            || 
HiveOperation.ALTERTABLE_LOCATION.equals(hiveEvent.getOperation())
             || 
HiveOperation.CREATETABLE_AS_SELECT.equals(hiveEvent.getOperation())) {
             return true;
         }
@@ -696,11 +716,11 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     }
 
     private Referenceable getProcessReferenceable(HiveMetaStoreBridge 
dgiBridge, HiveEventContext hiveEvent,
-        SortedMap<Entity, Referenceable> source, SortedMap<Entity, 
Referenceable> target) {
+        final SortedSet<ReadEntity> sortedHiveInputs, final 
SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> 
source, SortedMap<WriteEntity, Referenceable> target) {
         Referenceable processReferenceable = new 
Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
 
         String queryStr = lower(hiveEvent.getQueryStr());
-        processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getProcessQualifiedName(hiveEvent.getOperation(), source, target));
+        processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getProcessQualifiedName(hiveEvent, sortedHiveInputs, sortedHiveOutputs, source, 
target));
 
         LOG.debug("Registering query: {}", queryStr);
         List<Referenceable> sourceList = new ArrayList<>(source.values());
@@ -733,51 +753,113 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     }
 
     @VisibleForTesting
-    static String getProcessQualifiedName(HiveOperation op, SortedMap<Entity, 
Referenceable> inputs, SortedMap<Entity, Referenceable> outputs) {
+    static String getProcessQualifiedName(HiveEventContext eventContext, final 
SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> 
sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> hiveInputsMap, 
SortedMap<WriteEntity, Referenceable> hiveOutputsMap) {
+        HiveOperation op = eventContext.getOperation();
         StringBuilder buffer = new StringBuilder(op.getOperationName());
-        addDatasets(op, buffer, inputs);
+
+        boolean ignoreHDFSPathsinQFName = ignoreHDFSPathsinQFName(op, 
sortedHiveInputs, sortedHiveOutputs);
+        if ( ignoreHDFSPathsinQFName && LOG.isDebugEnabled()) {
+            LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, 
eventContext.getQueryStr());
+        }
+
+        addInputs(op, sortedHiveInputs, buffer, hiveInputsMap, 
ignoreHDFSPathsinQFName);
         buffer.append(IO_SEP);
-        addDatasets(op, buffer, outputs);
+        addOutputs(op, sortedHiveOutputs, buffer, hiveOutputsMap, 
ignoreHDFSPathsinQFName);
         LOG.info("Setting process qualified name to {}", buffer);
         return buffer.toString();
     }
 
-    private static void addDatasets(HiveOperation op, StringBuilder buffer, 
final Map<Entity, Referenceable> refs) {
-        if (refs != null) {
-            for (Entity input : refs.keySet()) {
-                final Entity entity = input;
+    private static boolean ignoreHDFSPathsinQFName(final HiveOperation op, 
final Set<ReadEntity> inputs, final Set<WriteEntity> outputs) {
+        switch (op) {
+        case LOAD:
+        case IMPORT:
+            return isPartitionBasedQuery(outputs);
+        case EXPORT:
+            return isPartitionBasedQuery(inputs);
+        case QUERY:
+            return true;
+        }
+        return false;
+    }
 
-                //HiveOperation.QUERY type encompasses INSERT, 
INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
-                if (addQueryType(op, entity)) {
-                    buffer.append(SEP);
-                    buffer.append(((WriteEntity) 
entity).getWriteType().name());
+    private static boolean isPartitionBasedQuery(Set<? extends Entity> 
entities) {
+        for (Entity entity : entities) {
+            if (Type.PARTITION.equals(entity.getType())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static void addInputs(HiveOperation op, SortedSet<ReadEntity> 
sortedInputs, StringBuilder buffer, final Map<ReadEntity, Referenceable> refs, 
final boolean ignoreHDFSPathsInQFName) {
+        if (refs != null) {
+            if (sortedInputs != null) {
+                Set<String> dataSetsProcessed = new LinkedHashSet<>();
+                for (Entity input : sortedInputs) {
+
+                    if 
(!dataSetsProcessed.contains(input.getName().toLowerCase())) {
+                        //HiveOperation.QUERY type encompasses INSERT, 
INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
+                        if (ignoreHDFSPathsInQFName &&
+                            (Type.DFS_DIR.equals(input.getType()) || 
Type.LOCAL_DIR.equals(input.getType()))) {
+                            LOG.debug("Skipping dfs dir input addition to 
process qualified name {} ", input.getName());
+                        } else if (refs.containsKey(input)) {
+                            addDataset(buffer, refs.get(input));
+                        }
+                        dataSetsProcessed.add(input.getName().toLowerCase());
+                    }
                 }
-                if (Type.DFS_DIR.equals(entity.getType()) ||
-                    Type.LOCAL_DIR.equals(entity.getType())) {
-                    LOG.debug("Skipping dfs dir addition into process 
qualified name {} ", 
refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
-                } else {
-                    buffer.append(SEP);
-                    String dataSetQlfdName = (String) 
refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
-                    // '/' breaks query parsing on ATLAS
-                    
buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
+
+            }
+        }
+    }
+
+    private static void addDataset(StringBuilder buffer, Referenceable ref) {
+        buffer.append(SEP);
+        String dataSetQlfdName = (String) 
ref.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
+        // '/' breaks query parsing on ATLAS
+        buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
+    }
+
+    private static void addOutputs(HiveOperation op, SortedSet<WriteEntity> 
sortedOutputs, StringBuilder buffer, final Map<WriteEntity, Referenceable> 
refs, final boolean ignoreHDFSPathsInQFName) {
+        if (refs != null) {
+            Set<String> dataSetsProcessed = new LinkedHashSet<>();
+            if (sortedOutputs != null) {
+                for (Entity output : sortedOutputs) {
+                    final Entity entity = output;
+                    if 
(!dataSetsProcessed.contains(output.getName().toLowerCase())) {
+                        //HiveOperation.QUERY type encompasses INSERT, 
INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
+                        if (addQueryType(op, (WriteEntity) entity)) {
+                            buffer.append(SEP);
+                            buffer.append(((WriteEntity) 
entity).getWriteType().name());
+                        }
+                        if (ignoreHDFSPathsInQFName &&
+                            (Type.DFS_DIR.equals(output.getType()) || 
Type.LOCAL_DIR.equals(output.getType()))) {
+                            LOG.debug("Skipping dfs dir output addition to 
process qualified name {} ", output.getName());
+                        } else if (refs.containsKey(output)) {
+                            addDataset(buffer, refs.get(output));
+                        }
+                        dataSetsProcessed.add(output.getName().toLowerCase());
+                    }
                 }
             }
         }
     }
 
-    private static boolean addQueryType(HiveOperation op, Entity entity) {
-        if (WriteEntity.class.isAssignableFrom(entity.getClass())) {
-            if (((WriteEntity) entity).getWriteType() != null &&
-                op.equals(HiveOperation.QUERY)) {
-                switch (((WriteEntity) entity).getWriteType()) {
-                case INSERT:
-                case INSERT_OVERWRITE:
-                case UPDATE:
-                case DELETE:
-                case PATH_WRITE:
+    private static boolean addQueryType(HiveOperation op, WriteEntity entity) {
+        if (((WriteEntity) entity).getWriteType() != null && 
HiveOperation.QUERY.equals(op)) {
+            switch (((WriteEntity) entity).getWriteType()) {
+            case INSERT:
+            case INSERT_OVERWRITE:
+            case UPDATE:
+            case DELETE:
+                return true;
+            case PATH_WRITE:
+                //Add query type only for DFS paths and ignore local paths 
since they are not added as outputs
+                if ( !Type.LOCAL_DIR.equals(entity.getType())) {
                     return true;
-                default:
                 }
+                break;
+            default:
             }
         }
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index f9e1926..8ca47d9 100755
--- 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -62,15 +62,22 @@ import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import static org.apache.atlas.AtlasClient.NAME;
 import static org.apache.atlas.hive.hook.HiveHook.entityComparator;
 import static org.apache.atlas.hive.hook.HiveHook.getProcessQualifiedName;
 import static org.apache.atlas.hive.hook.HiveHook.lower;
+import static org.apache.atlas.hive.hook.HiveHook.IO_SEP;
+import static org.apache.atlas.hive.hook.HiveHook.SEP;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -82,6 +89,8 @@ public class HiveHookIT {
     private static final String DGI_URL = "http://localhost:21000/";;
     private static final String CLUSTER_NAME = "test";
     public static final String DEFAULT_DB = "default";
+    
+    private static final String PART_FILE = "2015-01-01";
     private Driver driver;
     private AtlasClient atlasClient;
     private HiveMetaStoreBridge hiveMetaStoreBridge;
@@ -262,7 +271,7 @@ public class HiveHookIT {
         validateHDFSPaths(processReference, INPUTS, pFile);
     }
 
-    private List<Entity> getInputs(String inputName, Entity.Type entityType) {
+    private Set<ReadEntity> getInputs(String inputName, Entity.Type 
entityType) {
         final ReadEntity entity = new ReadEntity();
 
         if ( Entity.Type.DFS_DIR.equals(entityType)) {
@@ -270,14 +279,13 @@ public class HiveHookIT {
             entity.setTyp(Entity.Type.DFS_DIR);
         } else {
             entity.setName(getQualifiedTblName(inputName));
-            entity.setTyp(Entity.Type.TABLE);
+            entity.setTyp(entityType);
         }
 
-        return new ArrayList<Entity>() {{ add(entity); }};
+        return new LinkedHashSet<ReadEntity>() {{ add(entity); }};
     }
 
-
-    private List<Entity> getOutputs(String inputName, Entity.Type entityType) {
+    private Set<WriteEntity> getOutputs(String inputName, Entity.Type 
entityType) {
         final WriteEntity entity = new WriteEntity();
 
         if ( Entity.Type.DFS_DIR.equals(entityType) || 
Entity.Type.LOCAL_DIR.equals(entityType)) {
@@ -285,27 +293,32 @@ public class HiveHookIT {
             entity.setTyp(entityType);
         } else {
             entity.setName(getQualifiedTblName(inputName));
-            entity.setTyp(Entity.Type.TABLE);
+            entity.setTyp(entityType);
         }
 
-        return new ArrayList<Entity>() {{ add(entity); }};
+        return new LinkedHashSet<WriteEntity>() {{ add(entity); }};
     }
 
-
-    private void validateOutputTables(Referenceable processReference, 
List<Entity> expectedTables) throws Exception {
+    private void validateOutputTables(Referenceable processReference, 
Set<WriteEntity> expectedTables) throws Exception {
        validateTables(processReference, OUTPUTS, expectedTables);
     }
 
-    private void validateInputTables(Referenceable processReference, 
List<Entity> expectedTables) throws Exception {
+    private void validateInputTables(Referenceable processReference, 
Set<ReadEntity> expectedTables) throws Exception {
         validateTables(processReference, INPUTS, expectedTables);
     }
 
-    private void validateTables(Referenceable processReference, String 
attrName, List<Entity> expectedTables) throws Exception {
+    private void validateTables(Referenceable processReference, String 
attrName, Set<? extends Entity> expectedTables) throws Exception {
         List<Id> tableRef = (List<Id>) processReference.get(attrName);
+
+        Iterator<? extends Entity> iterator = expectedTables.iterator();
         for(int i = 0; i < expectedTables.size(); i++) {
-            Referenceable entity = 
atlasClient.getEntity(tableRef.get(i)._getId());
-            LOG.debug("Validating output {} {} ", i, entity);
-            
Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), 
expectedTables.get(i).getName());
+            Entity hiveEntity = iterator.next();
+            if (Entity.Type.TABLE.equals(hiveEntity.getType()) ||
+                Entity.Type.DFS_DIR.equals(hiveEntity.getType())) {
+                Referenceable entity = 
atlasClient.getEntity(tableRef.get(i)._getId());
+                LOG.debug("Validating output {} {} ", i, entity);
+                
Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), 
hiveEntity.getName());
+            }
         }
     }
 
@@ -338,18 +351,22 @@ public class HiveHookIT {
         String query = "create table " + ctasTableName + " as select * from " 
+ tableName;
         runCommand(query);
 
-        final ReadEntity entity = new ReadEntity();
-        entity.setName(getQualifiedTblName(tableName));
-        entity.setTyp(Entity.Type.TABLE);
+        final Set<ReadEntity> readEntities = getInputs(tableName, 
Entity.Type.TABLE);
+        final Set<WriteEntity> writeEntities = getOutputs(ctasTableName, 
Entity.Type.TABLE);
 
-        final WriteEntity writeEntity = new WriteEntity();
-        writeEntity.setTyp(Entity.Type.TABLE);
-        writeEntity.setName(getQualifiedTblName(ctasTableName));
-
-        assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, 
new ArrayList<Entity>() {{ add(entity); }}, new ArrayList<Entity>() {{ 
add(writeEntity); }});
+        assertProcessIsRegistered(constructEvent(query, 
HiveOperation.CREATETABLE_AS_SELECT, readEntities, writeEntities));
         assertTableIsRegistered(DEFAULT_DB, ctasTableName);
     }
 
+    private HiveHook.HiveEventContext constructEvent(String query, 
HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
+        HiveHook.HiveEventContext event = new HiveHook.HiveEventContext();
+        event.setQueryStr(query);
+        event.setOperation(op);
+        event.setInputs(inputs);
+        event.setOutputs(outputs);
+        return event;
+    }
+
     @Test
     public void testDropAndRecreateCTASOutput() throws Exception {
         String tableName = createTable();
@@ -359,10 +376,11 @@ public class HiveHookIT {
 
         assertTableIsRegistered(DEFAULT_DB, ctasTableName);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs =  getOutputs(ctasTableName, Entity.Type.TABLE);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        Set<WriteEntity> outputs =  getOutputs(ctasTableName, 
Entity.Type.TABLE);
 
-        String processId = assertProcessIsRegistered(query, 
HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
+        final HiveHook.HiveEventContext hiveEventContext = 
constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
+        String processId = assertProcessIsRegistered(hiveEventContext);
 
         final String drpquery = String.format("drop table %s ", ctasTableName);
         runCommand(drpquery);
@@ -371,14 +389,13 @@ public class HiveHookIT {
         //Fix after ATLAS-876
         runCommand(query);
         assertTableIsRegistered(DEFAULT_DB, ctasTableName);
-        String process2Id = assertProcessIsRegistered(query, 
HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
+        String process2Id = assertProcessIsRegistered(hiveEventContext, 
inputs, outputs);
 
         Assert.assertEquals(process2Id, processId);
 
         Referenceable processRef = atlasClient.getEntity(processId);
 
-        validateInputTables(processRef, inputs);
-        outputs.add(outputs.get(0));
+        outputs.add(outputs.iterator().next());
         validateOutputTables(processRef, outputs);
     }
 
@@ -389,7 +406,7 @@ public class HiveHookIT {
         String query = "create view " + viewName + " as select * from " + 
tableName;
         runCommand(query);
 
-        assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, 
getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, 
Entity.Type.TABLE));
+        assertProcessIsRegistered(constructEvent(query, 
HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), 
getOutputs(viewName, Entity.Type.TABLE)));
         assertTableIsRegistered(DEFAULT_DB, viewName);
     }
 
@@ -403,7 +420,7 @@ public class HiveHookIT {
         runCommand(query);
 
         String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name);
-        assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, 
getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, 
Entity.Type.TABLE));
+        assertProcessIsRegistered(constructEvent(query, 
HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), 
getOutputs(viewName, Entity.Type.TABLE)));
         String viewId = assertTableIsRegistered(DEFAULT_DB, viewName);
 
         //Check lineage which includes table1
@@ -419,7 +436,7 @@ public class HiveHookIT {
         runCommand(query);
 
         //Check if alter view process is reqistered
-        assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, 
getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, 
Entity.Type.TABLE));
+        assertProcessIsRegistered(constructEvent(query, 
HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), 
getOutputs(viewName, Entity.Type.TABLE)));
         String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name);
         Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), 
viewId);
 
@@ -456,9 +473,7 @@ public class HiveHookIT {
         String query = "load data local inpath 'file://" + loadFile + "' into 
table " + tableName;
         runCommand(query);
 
-        List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
-
-        assertProcessIsRegistered(query, HiveOperation.LOAD, null, outputs);
+        assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, 
null, getOutputs(tableName, Entity.Type.TABLE)));
     }
 
     @Test
@@ -466,41 +481,56 @@ public class HiveHookIT {
         String tableName = createTable(true);
 
         String loadFile = file("load");
-        String query = "load data local inpath 'file://" + loadFile + "' into 
table " + tableName +  " partition(dt = '2015-01-01')";
+        String query = "load data local inpath 'file://" + loadFile + "' into 
table " + tableName +  " partition(dt = '"+ PART_FILE + "')";
         runCommand(query);
 
-        validateProcess(query, HiveOperation.LOAD, null, getOutputs(tableName, 
Entity.Type.TABLE));
+        assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, 
null, getOutputs(tableName, Entity.Type.TABLE)));
     }
 
     @Test
-    public void testLoadDFSPath() throws Exception {
+    public void testLoadDFSPathPartitioned() throws Exception {
         String tableName = createTable(true, true, false);
 
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
 
-        String loadFile = createTestDFSFile("loadDFSFile");
-        String query = "load data inpath '" + loadFile + "' into table " + 
tableName + " partition(dt = '2015-01-01')";
+        final String loadFile = createTestDFSFile("loadDFSFile");
+        String query = "load data inpath '" + loadFile + "' into table " + 
tableName + " partition(dt = '"+ PART_FILE + "')";
         runCommand(query);
 
-        final List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
-        Referenceable processReference = validateProcess(query, 
HiveOperation.LOAD, getInputs(loadFile, Entity.Type.DFS_DIR), outputs);
+        final Set<WriteEntity> outputs = getOutputs(tableName, 
Entity.Type.TABLE);
+        final Set<ReadEntity> inputs = getInputs(loadFile, 
Entity.Type.DFS_DIR);
 
-        validateHDFSPaths(processReference, INPUTS, loadFile);
+        final Set<WriteEntity> partitionOps = new LinkedHashSet<>(outputs);
+        partitionOps.addAll(getOutputs(DEFAULT_DB + "@" + tableName + "@dt=" + 
PART_FILE, Entity.Type.PARTITION));
 
+        Referenceable processReference = validateProcess(constructEvent(query, 
HiveOperation.LOAD, inputs, partitionOps), inputs, outputs);
+        validateHDFSPaths(processReference, INPUTS, loadFile);
         validateOutputTables(processReference, outputs);
+
+        final String loadFile2 = createTestDFSFile("loadDFSFile1");
+        query = "load data inpath '" + loadFile2 + "' into table " + tableName 
+ " partition(dt = '"+ PART_FILE + "')";
+        runCommand(query);
+
+        Set<ReadEntity> process2Inputs = getInputs(loadFile2, 
Entity.Type.DFS_DIR);
+        Set<ReadEntity> expectedInputs = new LinkedHashSet<>();
+        expectedInputs.addAll(process2Inputs);
+        expectedInputs.addAll(inputs);
+
+        validateProcess(constructEvent(query, HiveOperation.LOAD, 
expectedInputs, partitionOps), expectedInputs, outputs);
+
     }
 
     private String getQualifiedTblName(String inputTable) {
         String inputtblQlfdName = inputTable;
 
-        if (inputTable != null && !inputTable.contains(".")) {
+        if (inputTable != null && !inputTable.contains("@")) {
             inputtblQlfdName = 
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, inputTable);
         }
         return inputtblQlfdName;
     }
 
-    private Referenceable validateProcess(String query, HiveOperation op, 
List<Entity> inputTables, List<Entity> outputTables) throws Exception {
-        String processId = assertProcessIsRegistered(query, op, inputTables, 
outputTables);
+    private Referenceable validateProcess(HiveHook.HiveEventContext event, 
Set<ReadEntity> inputTables, Set<WriteEntity> outputTables) throws Exception {
+        String processId = assertProcessIsRegistered(event, inputTables, 
outputTables);
         Referenceable process = atlasClient.getEntity(processId);
         if (inputTables == null) {
             Assert.assertNull(process.get(INPUTS));
@@ -519,25 +549,47 @@ public class HiveHookIT {
         return process;
     }
 
+    private Referenceable validateProcess(HiveHook.HiveEventContext event) 
throws Exception {
+       return validateProcess(event, event.getInputs(), event.getOutputs());
+    }
+
     @Test
     public void testInsertIntoTable() throws Exception {
-        String tableName = createTable();
+        String inputTable1Name = createTable();
+        String inputTable2Name = createTable();
         String insertTableName = createTable();
-        assertTableIsRegistered(DEFAULT_DB, tableName);
+        assertTableIsRegistered(DEFAULT_DB, inputTable1Name);
         assertTableIsRegistered(DEFAULT_DB, insertTableName);
 
-        String query = "insert into " + insertTableName + " select id, name 
from " + tableName;
+        String query = "insert into " + insertTableName + " select t1.id, 
t1.name from " + inputTable2Name + " as t2, " + inputTable1Name + " as t1 where 
t1.id=t2.id";
 
         runCommand(query);
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
-        
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
+        final Set<ReadEntity> inputs = getInputs(inputTable1Name, 
Entity.Type.TABLE);
+        inputs.addAll(getInputs(inputTable2Name, Entity.Type.TABLE));
+
+        Set<WriteEntity> outputs = getOutputs(insertTableName, 
Entity.Type.TABLE);
+        (outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
+
+        HiveHook.HiveEventContext event = constructEvent(query, 
HiveOperation.QUERY, inputs, outputs);
+
+        Set<ReadEntity> expectedInputs = new 
TreeSet<ReadEntity>(entityComparator) {{
+            addAll(inputs);
+        }};
+        assertTableIsRegistered(DEFAULT_DB, insertTableName);
+        Referenceable processRef1 = validateProcess(event, expectedInputs, 
outputs);
 
-        Referenceable processRef1 = validateProcess(query, 
HiveOperation.QUERY, inputs, outputs);
+        //Test sorting of tbl names
+        SortedSet<String> sortedTblNames = new TreeSet<>();
+        sortedTblNames.add(getQualifiedTblName(inputTable1Name));
+        sortedTblNames.add(getQualifiedTblName(inputTable2Name));
+
+        //Verify sorted orer of inputs in qualified name
+        Assert.assertEquals(Joiner.on(SEP).join("QUERY", 
sortedTblNames.first(), sortedTblNames.last()) + IO_SEP + SEP + 
Joiner.on(SEP).join(WriteEntity.WriteType.INSERT.name(), 
getQualifiedTblName(insertTableName))
+            , processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
 
         //Rerun same query. Should result in same process
         runCommandWithDelay(query, 1000);
-        Referenceable processRef2 = validateProcess(query, 
HiveOperation.QUERY, inputs, outputs);
+        Referenceable processRef2 = validateProcess(event, expectedInputs, 
outputs);
         Assert.assertEquals(processRef1.getId()._getId(), 
processRef2.getId()._getId());
 
     }
@@ -550,7 +602,7 @@ public class HiveHookIT {
             "insert overwrite LOCAL DIRECTORY '" + 
randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
 
         runCommand(query);
-        validateProcess(query, HiveOperation.QUERY, getInputs(tableName, 
Entity.Type.TABLE), null);
+        validateProcess(constructEvent(query, HiveOperation.QUERY, 
getInputs(tableName, Entity.Type.TABLE), null));
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
     }
@@ -564,72 +616,78 @@ public class HiveHookIT {
 
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
-        
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        final Set<WriteEntity> outputs = getOutputs(pFile1, 
Entity.Type.DFS_DIR);
+        
((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE);
 
-        Referenceable processReference = validateProcess(query, 
HiveOperation.QUERY, inputs, outputs);
+        final HiveHook.HiveEventContext hiveEventContext = 
constructEvent(query, HiveOperation.QUERY, inputs, outputs);
+        Referenceable processReference = validateProcess(hiveEventContext);
         validateHDFSPaths(processReference, OUTPUTS, pFile1);
 
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
         validateInputTables(processReference, inputs);
 
         //Rerun same query with same HDFS path
-
-        runCommand(query);
-        Referenceable process2Reference = validateProcess(query,  
HiveOperation.QUERY, inputs, outputs);
+        runCommandWithDelay(query, 1000);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
+        Referenceable process2Reference = validateProcess(hiveEventContext);
         validateHDFSPaths(process2Reference, OUTPUTS, pFile1);
 
         Assert.assertEquals(process2Reference.getId()._getId(), 
processReference.getId()._getId());
 
-        //Rerun same query with a new HDFS path. Will result in same process 
since HDFS paths are not part of qualifiedName.
+        //Rerun same query with a new HDFS path. Will result in same process 
since HDFS paths is not part of qualified name for QUERY operations
         final String pFile2 = createTestDFSPath("somedfspath2");
         query = "insert overwrite DIRECTORY '" + pFile2  + "' select id, name 
from " + tableName;
-        runCommand(query);
-        List<Entity> p3Outputs = new ArrayList<Entity>() {{
+        runCommandWithDelay(query, 1000);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
+        Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{
             addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
             addAll(outputs);
         }};
 
-        Referenceable process3Reference = validateProcess(query,  
HiveOperation.QUERY, inputs, p3Outputs);
+        Referenceable process3Reference = 
validateProcess(constructEvent(query,  HiveOperation.QUERY, inputs, p3Outputs));
         validateHDFSPaths(process3Reference, OUTPUTS, pFile2);
 
         Assert.assertEquals(process3Reference.getId()._getId(), 
processReference.getId()._getId());
     }
 
     @Test
-    public void testInsertIntoDFSDir() throws Exception {
-        String tableName = createTable();
+    public void testInsertIntoDFSDirPartitioned() throws Exception {
+
+        //Test with partitioned table
+        String tableName = createTable(true);
         String pFile1 = createTestDFSPath("somedfspath1");
         String query =
-            "insert overwrite DIRECTORY '" + pFile1  + "' select id, name from 
" + tableName;
+            "insert overwrite DIRECTORY '" + pFile1  + "' select id, name from 
" + tableName + " where dt = '" + PART_FILE + "'";
 
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
-        
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        final Set<WriteEntity> outputs = getOutputs(pFile1, 
Entity.Type.DFS_DIR);
+        
((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE);
 
-        Referenceable processReference = validateProcess(query,  
HiveOperation.QUERY, inputs, outputs);
-        validateHDFSPaths(processReference, OUTPUTS, pFile1);
+        final Set<ReadEntity> partitionIps = new LinkedHashSet<>(inputs);
+        partitionIps.addAll(getInputs(DEFAULT_DB + "@" + tableName + "@dt='" + 
PART_FILE + "'", Entity.Type.PARTITION));
 
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        Referenceable processReference = validateProcess(constructEvent(query, 
 HiveOperation.QUERY, partitionIps, outputs), inputs, outputs);
 
-        validateInputTables(processReference, inputs);
-
-        //Rerun same query with different HDFS path
+        //Rerun same query with different HDFS path. Should not create another 
process and should update it.
 
         final String pFile2 = createTestDFSPath("somedfspath2");
         query =
-            "insert overwrite DIRECTORY '" + pFile2  + "' select id, name from 
" + tableName;
+            "insert overwrite DIRECTORY '" + pFile2  + "' select id, name from 
" + tableName + " where dt = '" + PART_FILE + "'";
 
         runCommand(query);
-        List<Entity> p2Outputs = new ArrayList<Entity>() {{
-            addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
+
+        final Set<WriteEntity> pFile2Outputs = getOutputs(pFile2, 
Entity.Type.DFS_DIR);
+        
((WriteEntity)pFile2Outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE);
+        //Now the process has 2 paths - one older with deleted reference to 
partition and another with the the latest partition
+        Set<WriteEntity> p2Outputs = new LinkedHashSet<WriteEntity>() {{
+            addAll(pFile2Outputs);
             addAll(outputs);
         }};
 
-        Referenceable process2Reference = validateProcess(query, 
HiveOperation.QUERY, inputs, p2Outputs);
+        Referenceable process2Reference = 
validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, 
pFile2Outputs), inputs, p2Outputs);
         validateHDFSPaths(process2Reference, OUTPUTS, pFile2);
 
         Assert.assertEquals(process2Reference.getId()._getId(), 
processReference.getId()._getId());
@@ -647,12 +705,12 @@ public class HiveHookIT {
 
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
-        outputs.get(0).setName(getQualifiedTblName(insertTableName + 
HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
-        
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        Set<WriteEntity> outputs = getOutputs(insertTableName, 
Entity.Type.TABLE);
+        outputs.iterator().next().setName(getQualifiedTblName(insertTableName 
+ HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
+        
((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
 
-        validateProcess(query,  HiveOperation.QUERY, inputs, outputs);
+        validateProcess(constructEvent(query,  HiveOperation.QUERY, inputs, 
outputs));
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
         assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
@@ -660,21 +718,40 @@ public class HiveHookIT {
 
     @Test
     public void testInsertIntoPartition() throws Exception {
-        String tableName = createTable(true);
-        String insertTableName = createTable(true);
+        final boolean isPartitionedTable = true;
+        String tableName = createTable(isPartitionedTable);
+        String insertTableName = createTable(isPartitionedTable);
         String query =
-            "insert into " + insertTableName + " partition(dt = '2015-01-01') 
select id, name from " + tableName
-                + " where dt = '2015-01-01'";
+            "insert into " + insertTableName + " partition(dt = '"+ PART_FILE 
+ "') select id, name from " + tableName
+                + " where dt = '"+ PART_FILE + "'";
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
-        
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
+        final Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        final Set<WriteEntity> outputs = getOutputs(insertTableName, 
Entity.Type.TABLE);
+        
((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
+
+        final Set<ReadEntity> partitionIps = new LinkedHashSet<ReadEntity>() {
+            {
+                addAll(inputs);
+                add(getPartitionInput());
+
+            }
+        };
+
+        final Set<WriteEntity> partitionOps = new LinkedHashSet<WriteEntity>() 
{
+            {
+                addAll(outputs);
+                add(getPartitionOutput());
+
+            }
+        };
 
-        validateProcess(query,  HiveOperation.QUERY, inputs, outputs);
+        validateProcess(constructEvent(query,  HiveOperation.QUERY, 
partitionIps, partitionOps), inputs, outputs);
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
         assertTableIsRegistered(DEFAULT_DB, insertTableName);
+
+        //TODO - update
     }
 
     private String random() {
@@ -701,65 +778,111 @@ public class HiveHookIT {
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
 
-        String filename = "pfile://" + mkdir("export");
+        String filename = "pfile://" + mkdir("exportUnPartitioned");
         String query = "export table " + tableName + " to \"" + filename + 
"\"";
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
 
-        Referenceable processReference = validateProcess(query, 
HiveOperation.EXPORT, inputs, outputs);
+        Referenceable processReference = validateProcess(constructEvent(query, 
HiveOperation.EXPORT, inputs, outputs));
 
         validateHDFSPaths(processReference, OUTPUTS, filename);
         validateInputTables(processReference, inputs);
 
         //Import
-        tableName = createTable(false);
-        assertTableIsRegistered(DEFAULT_DB, tableName);
+        String importTableName = createTable(false);
+        assertTableIsRegistered(DEFAULT_DB, importTableName);
 
-        query = "import table " + tableName + " from '" + filename + "'";
+        query = "import table " + importTableName + " from '" + filename + "'";
         runCommand(query);
-        outputs = getOutputs(tableName, Entity.Type.TABLE);
-        processReference = validateProcess(query, HiveOperation.IMPORT, 
getInputs(filename, Entity.Type.DFS_DIR), outputs);
-        validateHDFSPaths(processReference, INPUTS, filename);
+        outputs = getOutputs(importTableName, Entity.Type.TABLE);
+        validateProcess(constructEvent(query, HiveOperation.IMPORT, 
getInputs(filename, Entity.Type.DFS_DIR), outputs));
 
-        validateOutputTables(processReference, outputs);
+        //Should create another process
+        filename = "pfile://" + mkdir("export2UnPartitioned");
+        query = "export table " + tableName + " to \"" + filename + "\"";
+        runCommand(query);
+
+        inputs = getInputs(tableName, Entity.Type.TABLE);
+        outputs = getOutputs(filename, Entity.Type.DFS_DIR);
+
+        validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, 
outputs));
+
+        //import again shouyld create another process
+        query = "import table " + importTableName + " from '" + filename + "'";
+        runCommand(query);
+        outputs = getOutputs(importTableName, Entity.Type.TABLE);
+        validateProcess(constructEvent(query, HiveOperation.IMPORT, 
getInputs(filename, Entity.Type.DFS_DIR), outputs));
     }
 
     @Test
     public void testExportImportPartitionedTable() throws Exception {
-        String tableName = createTable(true);
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        boolean isPartitionedTable = true;
+        final String tableName = createTable(isPartitionedTable);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
 
         //Add a partition
         String partFile = "pfile://" + mkdir("partition");
-        String query = "alter table " + tableName + " add partition 
(dt='2015-01-01') location '" + partFile + "'";
+        String query = "alter table " + tableName + " add partition (dt='"+ 
PART_FILE + "') location '" + partFile + "'";
         runCommand(query);
 
         String filename = "pfile://" + mkdir("export");
         query = "export table " + tableName + " to \"" + filename + "\"";
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
+        final Set<ReadEntity> expectedExportInputs = getInputs(tableName, 
Entity.Type.TABLE);
+        final Set<WriteEntity> outputs = getOutputs(filename, 
Entity.Type.DFS_DIR);
 
-        Referenceable processReference = validateProcess(query, 
HiveOperation.EXPORT, inputs, outputs);
-        validateHDFSPaths(processReference, OUTPUTS, filename);
+        //Note that export has only partition as input in this case
+        final Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + 
tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
+        partitionIps.addAll(expectedExportInputs);
 
-        validateInputTables(processReference, inputs);
+        Referenceable processReference = validateProcess(constructEvent(query, 
HiveOperation.EXPORT, partitionIps, outputs), expectedExportInputs, outputs);
+        validateHDFSPaths(processReference, OUTPUTS, filename);
 
         //Import
-        tableName = createTable(true);
-        tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        String importTableName = createTable(true);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
 
-        query = "import table " + tableName + " from '" + filename + "'";
+        query = "import table " + importTableName + " from '" + filename + "'";
         runCommand(query);
 
-        outputs = getOutputs(tableName, Entity.Type.TABLE);
-        processReference = validateProcess(query, HiveOperation.IMPORT, 
getInputs(filename, Entity.Type.DFS_DIR), outputs);
-        validateHDFSPaths(processReference, INPUTS, filename);
+        final Set<ReadEntity> expectedImportInputs = getInputs(filename, 
Entity.Type.DFS_DIR);
+        final Set<WriteEntity> importOutputs = getOutputs(importTableName, 
Entity.Type.TABLE);
 
-        validateOutputTables(processReference, outputs);
+        final Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + 
importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
+        partitionOps.addAll(importOutputs);
+
+        validateProcess(constructEvent(query, HiveOperation.IMPORT, 
expectedImportInputs , partitionOps), expectedImportInputs, importOutputs);
+
+        //Export should update same process
+        filename = "pfile://" + mkdir("export2");
+        query = "export table " + tableName + " to \"" + filename + "\"";
+        runCommand(query);
+
+        final Set<WriteEntity> outputs2 = getOutputs(filename, 
Entity.Type.DFS_DIR);
+        Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{
+            addAll(outputs2);
+            addAll(outputs);
+        }};
+
+        validateProcess(constructEvent(query, HiveOperation.EXPORT, 
partitionIps, outputs2), expectedExportInputs, p3Outputs);
+
+        query = "alter table " + importTableName + " drop partition (dt='"+ 
PART_FILE + "')";
+        runCommand(query);
+
+        //Import should update same process
+        query = "import table " + importTableName + " from '" + filename + "'";
+        runCommandWithDelay(query, 1000);
+
+        final Set<ReadEntity> importInputs = getInputs(filename, 
Entity.Type.DFS_DIR);
+        final Set<ReadEntity> expectedImport2Inputs  = new 
LinkedHashSet<ReadEntity>() {{
+            addAll(importInputs);
+            addAll(expectedImportInputs);
+        }};
+
+        validateProcess(constructEvent(query, HiveOperation.IMPORT, 
importInputs, partitionOps), expectedImport2Inputs, importOutputs);
     }
 
     @Test
@@ -767,13 +890,14 @@ public class HiveHookIT {
         String tableName = createTable();
         String query = "select * from " + tableName;
         runCommand(query);
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        HiveHook.HiveEventContext hiveEventContext = constructEvent(query, 
HiveOperation.QUERY, inputs, null);
+        assertProcessIsNotRegistered(hiveEventContext);
 
         //check with uppercase table name
         query = "SELECT * from " + tableName.toUpperCase();
         runCommand(query);
-        assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null);
+        assertProcessIsNotRegistered(hiveEventContext);
     }
 
     @Test
@@ -1042,10 +1166,10 @@ public class HiveHookIT {
         String query = String.format("truncate table %s", tableName);
         runCommand(query);
 
-        List<Entity> outputs = getInputs(tableName, Entity.Type.TABLE);
+        Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE);
 
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
-        validateProcess(query, HiveOperation.TRUNCATETABLE, null, outputs);
+        validateProcess(constructEvent(query, HiveOperation.TRUNCATETABLE, 
null, outputs));
 
         //Check lineage
         String datasetName = 
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
@@ -1144,7 +1268,7 @@ public class HiveHookIT {
         String query = "alter table " + tableName + " set location '" + 
testPath + "'";
         runCommand(query);
 
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName, new 
AssertPredicate() {
+        assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
             @Override
             public void assertOnEntity(Referenceable tableRef) throws 
Exception {
                 Referenceable sdRef = (Referenceable) 
tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
@@ -1152,10 +1276,11 @@ public class HiveHookIT {
             }
         });
 
-        List<Entity> inputs = getInputs(testPath, Entity.Type.DFS_DIR);
-        List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
+        String processId = 
assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+            HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, 
DEFAULT_DB, tableName, false), null);
+
+        Referenceable processReference = atlasClient.getEntity(processId);
 
-        Referenceable processReference = validateProcess(query, 
HiveOperation.ALTERTABLE_LOCATION, inputs, outputs);
         validateHDFSPaths(processReference, INPUTS, testPath);
     }
 
@@ -1302,6 +1427,20 @@ public class HiveHookIT {
         assertTableIsNotRegistered(DEFAULT_DB, tableName);
     }
 
+    private WriteEntity getPartitionOutput() {
+        WriteEntity partEntity = new WriteEntity();
+        partEntity.setName(PART_FILE);
+        partEntity.setTyp(Entity.Type.PARTITION);
+        return partEntity;
+    }
+
+    private ReadEntity getPartitionInput() {
+        ReadEntity partEntity = new ReadEntity();
+        partEntity.setName(PART_FILE);
+        partEntity.setTyp(Entity.Type.PARTITION);
+        return partEntity;
+    }
+
     @Test
     public void testDropDatabaseWithCascade() throws Exception {
         //Test Deletion of database and its corresponding tables
@@ -1550,26 +1689,66 @@ public class HiveHookIT {
         }
     }
 
-    private String assertProcessIsRegistered(final String queryStr, 
HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) 
throws Exception {
-        String processQFName = getProcessQualifiedName(op, 
getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
-        LOG.debug("Searching for process with query {}", processQFName);
-        return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
-            @Override
-            public void assertOnEntity(final Referenceable entity) throws 
Exception {
-                List<String> recentQueries = (List<String>) 
entity.get("recentQueries");
-                Assert.assertEquals(recentQueries.get(0), lower(queryStr));
+    private String assertProcessIsRegistered(final HiveHook.HiveEventContext 
event) throws Exception {
+        try {
+            SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null 
? null : new TreeSet<ReadEntity>(entityComparator);
+            SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == 
null ? null : new TreeSet<WriteEntity>(entityComparator);
+
+            if ( event.getInputs() != null) {
+                sortedHiveInputs.addAll(event.getInputs());
             }
-        });
+            if ( event.getOutputs() != null) {
+                sortedHiveOutputs.addAll(event.getOutputs());
+            }
+
+            String processQFName = getProcessQualifiedName(event, 
sortedHiveInputs, sortedHiveOutputs, 
getSortedProcessDataSets(event.getInputs()), 
getSortedProcessDataSets(event.getOutputs()));
+            LOG.debug("Searching for process with query {}", processQFName);
+            return 
assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
+                @Override
+                public void assertOnEntity(final Referenceable entity) throws 
Exception {
+                    List<String> recentQueries = (List<String>) 
entity.get("recentQueries");
+                    Assert.assertEquals(recentQueries.get(0), 
lower(event.getQueryStr()));
+                }
+            });
+        } catch (Exception e) {
+            LOG.error("Exception : ", e);
+            throw e;
+        }
+    }
+
+    private String assertProcessIsRegistered(final HiveHook.HiveEventContext 
event, final Set<ReadEntity> inputTbls, final Set<WriteEntity> outputTbls) 
throws Exception {
+        try {
+            SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null 
? null : new TreeSet<ReadEntity>(entityComparator);
+            SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == 
null ? null : new TreeSet<WriteEntity>(entityComparator);
+            if ( event.getInputs() != null) {
+                sortedHiveInputs.addAll(event.getInputs());
+            }
+            if ( event.getOutputs() != null) {
+                sortedHiveOutputs.addAll(event.getOutputs());
+            }
+            String processQFName = getProcessQualifiedName(event, 
sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), 
getSortedProcessDataSets(outputTbls));
+            LOG.debug("Searching for process with query {}", processQFName);
+            return 
assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
+                @Override
+                public void assertOnEntity(final Referenceable entity) throws 
Exception {
+                    List<String> recentQueries = (List<String>) 
entity.get("recentQueries");
+                    Assert.assertEquals(recentQueries.get(0), 
lower(event.getQueryStr()));
+                }
+            });
+        } catch(Exception e) {
+            LOG.error("Exception : ", e);
+            throw e;
+        }
     }
 
     private String getDSTypeName(Entity entity) {
         return Entity.Type.TABLE.equals(entity.getType()) ? 
HiveDataTypes.HIVE_TABLE.name() : FSDataTypes.HDFS_PATH().toString();
     }
 
-    private SortedMap<Entity, Referenceable> 
getSortedProcessDataSets(List<Entity> inputTbls) {
-        SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, 
Referenceable>(entityComparator);
+    private <T extends Entity> SortedMap<T, Referenceable> 
getSortedProcessDataSets(Set<T> inputTbls) {
+        SortedMap<T, Referenceable> inputs = new TreeMap<T, 
Referenceable>(entityComparator);
         if (inputTbls != null) {
-            for (final Entity tbl : inputTbls) {
+            for (final T tbl : inputTbls) {
                 Referenceable inputTableRef = new 
Referenceable(getDSTypeName(tbl), new HashMap<String, Object>() {{
                     put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
tbl.getName());
                 }});
@@ -1579,10 +1758,22 @@ public class HiveHookIT {
         return inputs;
     }
 
-    private void assertProcessIsNotRegistered(String queryStr, HiveOperation 
op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws 
Exception {
-        String processQFName = getProcessQualifiedName(op, 
getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
-        LOG.debug("Searching for process with query {}", processQFName);
-        assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName);
+    private void assertProcessIsNotRegistered(HiveHook.HiveEventContext event) 
throws Exception {
+        try {
+            SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null 
? null : new TreeSet<ReadEntity>(entityComparator);
+            SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == 
null ? null : new TreeSet<WriteEntity>(entityComparator);
+            if ( event.getInputs() != null) {
+                sortedHiveInputs.addAll(event.getInputs());
+            }
+            if ( event.getOutputs() != null) {
+                sortedHiveOutputs.addAll(event.getOutputs());
+            }
+            String processQFName = getProcessQualifiedName(event, 
sortedHiveInputs, sortedHiveOutputs, 
getSortedProcessDataSets(event.getInputs()), 
getSortedProcessDataSets(event.getOutputs()));
+            LOG.debug("Searching for process with query {}", processQFName);
+            assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName);
+        } catch( Exception e) {
+            LOG.error("Exception : ", e);
+        }
     }
 
     private void assertTableIsNotRegistered(String dbName, String tableName, 
boolean isTemporaryTable) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java 
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 8bbe2d7..09b1c4b 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -128,7 +128,7 @@ public abstract class AtlasHook {
             } catch (Exception e) {
                 numRetries++;
                 if (numRetries < maxRetries) {
-                    LOG.debug("Failed to notify atlas for entity {}. 
Retrying", message, e);
+                    LOG.info("Failed to notify atlas for entity {}. Retrying", 
message, e);
                 } else {
                     if (shouldLogFailedMessages && e instanceof 
NotificationException) {
                         List<String> failedMessages = ((NotificationException) 
e).getFailedMessages();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 5a6440c..5de6df1 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
 ATLAS-966 Exit execution of import_hive.sh if HIVE_HOME is not set (svimal2106 
via sumasai)
+ATLAS-917 Add hdfs paths to process qualified name for non-partition based 
queries (sumasai)
 
 
 --Release 0.7-incubating

Reply via email to