Repository: incubator-atlas Updated Branches: refs/heads/master 4c56c61f8 -> a52112d86
ATLAS-247 Hive Column level lineage (rhbutani,svimal2106 via shwethags) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/a52112d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/a52112d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/a52112d8 Branch: refs/heads/master Commit: a52112d8633c2eb5dea0c3a6ffd3f308594a3996 Parents: 4c56c61 Author: Shwetha GS <[email protected]> Authored: Fri Sep 30 12:51:29 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Fri Sep 30 12:51:29 2016 +0530 ---------------------------------------------------------------------- .../atlas/hive/bridge/ColumnLineageUtils.java | 121 +++++++++++++++++++ .../org/apache/atlas/hive/hook/HiveHook.java | 76 +++++++++++- .../hive/model/HiveDataModelGenerator.java | 21 +++- .../apache/atlas/hive/model/HiveDataTypes.java | 3 +- .../org/apache/atlas/hive/hook/HiveHookIT.java | 117 ++++++++++++++---- release-log.txt | 1 + 6 files changed, 313 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java new file mode 100644 index 0000000..e4a20e1 --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.bridge; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.hive.hook.HiveHook; +import org.apache.atlas.hive.model.HiveDataModelGenerator; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class ColumnLineageUtils { + public static final Logger LOG = LoggerFactory.getLogger(ColumnLineageUtils.class); + public static class HiveColumnLineageInfo { + public final String depenendencyType; + public final String expr; + public final String inputColumn; + + HiveColumnLineageInfo(LineageInfo.Dependency d, String inputCol) { + depenendencyType = d.getType().name(); + expr = d.getExpr(); + inputColumn = inputCol; + } + + @Override + public String toString(){ + return inputColumn; + } + } + + public static String getQualifiedName(LineageInfo.DependencyKey key){ + String db = key.getDataContainer().getTable().getDbName(); + String table = key.getDataContainer().getTable().getTableName(); + String col = key.getFieldSchema().getName(); + return db + "." + table + "." + col; + } + + public static Map<String, List<HiveColumnLineageInfo>> buildLineageMap(LineageInfo lInfo) { + Map<String, List<HiveColumnLineageInfo>> m = new HashMap<>(); + + for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : lInfo.entrySet()) { + List<HiveColumnLineageInfo> l = new ArrayList<>(); + String k = getQualifiedName(e.getKey()); + for (LineageInfo.BaseColumnInfo iCol : e.getValue().getBaseCols()) { + String db = iCol.getTabAlias().getTable().getDbName(); + String table = iCol.getTabAlias().getTable().getTableName(); + String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db + "." + table + "." + iCol.getColumn().getName(); + l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName)); + } + LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k); + m.put(k, l); + } + return m; + } + + static String[] extractComponents(String qualifiedName) { + String[] comps = qualifiedName.split("\\."); + int lastIdx = comps.length - 1; + int atLoc = comps[lastIdx].indexOf('@'); + if (atLoc > 0) { + comps[lastIdx] = comps[lastIdx].substring(0, atLoc); + } + return comps; + } + + static void populateColumnReferenceableMap(Map<String, Referenceable> m, + Referenceable r) { + if (r.getTypeName().equals(HiveDataTypes.HIVE_TABLE.getName())) { + String qName = (String) r.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); + String[] qNameComps = extractComponents(qName); + for (Referenceable col : (List<Referenceable>) r.get(HiveDataModelGenerator.COLUMNS)) { + String cName = (String) col.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); + String[] colQNameComps = extractComponents(cName); + String colQName = colQNameComps[0] + "." + colQNameComps[1] + "." + colQNameComps[2]; + m.put(colQName, col); + } + String tableQName = qNameComps[0] + "." + qNameComps[1]; + m.put(tableQName, r); + } + } + + + public static Map<String, Referenceable> buildColumnReferenceableMap(List<Referenceable> inputs, + List<Referenceable> outputs) { + Map<String, Referenceable> m = new HashMap<>(); + + for (Referenceable r : inputs) { + populateColumnReferenceableMap(m, r); + } + + for (Referenceable r : outputs) { + populateColumnReferenceableMap(m, r); + } + + return m; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index a3464a0..eaef337 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasConstants; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.hive.bridge.ColumnLineageUtils; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hook.AtlasHook; @@ -39,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.*; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.Entity.Type; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; @@ -55,6 +57,7 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.net.MalformedURLException; import java.net.URI; import java.util.ArrayList; @@ -182,6 +185,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { event.setQueryStr(hookContext.getQueryPlan().getQueryStr()); event.setQueryStartTime(hookContext.getQueryPlan().getQueryStartTime()); event.setQueryType(hookContext.getQueryPlan().getQueryPlan().getQueryType()); + event.setLineageInfo(hookContext.getLinfo()); if (executor == null) { fireAndForget(event); @@ -616,7 +620,21 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { if (source.size() > 0 || target.size() > 0) { Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, sortedHiveInputs, sortedHiveOutputs, source, target); - entities.add(processReferenceable); + // setup Column Lineage + List<Referenceable> sourceList = new ArrayList<>(source.values()); + List<Referenceable> targetList = new ArrayList<>(target.values()); + List<Referenceable> colLineageProcessInstances = new ArrayList<>(); + try { + Map<String, Referenceable> columnQNameToRef = + ColumnLineageUtils.buildColumnReferenceableMap(sourceList, targetList); + colLineageProcessInstances = createColumnLineageProcessInstances(processReferenceable, + event.lineageInfo, + columnQNameToRef); + }catch (Exception e){ + LOG.warn("Column lineage process setup failed with exception {}", e); + } + colLineageProcessInstances.add(0, processReferenceable); + entities.addAll(colLineageProcessInstances); event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities))); } else { LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr()); @@ -773,6 +791,51 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return processReferenceable; } + + private List<Referenceable> createColumnLineageProcessInstances( + Referenceable processRefObj, + Map<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> lineageInfo, + Map<String, Referenceable> columnQNameToRef + ) { + List<Referenceable> l = new ArrayList<>(); + for(Map.Entry<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> e : + lineageInfo.entrySet()) { + Referenceable destCol = columnQNameToRef.get(e.getKey()); + if (destCol == null ) { + LOG.debug("Couldn't find output Column {}", e.getKey()); + continue; + } + List<Referenceable> outRef = new ArrayList<>(); + outRef.add(destCol); + List<Referenceable> inputRefs = new ArrayList<>(); + for(ColumnLineageUtils.HiveColumnLineageInfo cLI : e.getValue()) { + Referenceable srcCol = columnQNameToRef.get(cLI.inputColumn); + if (srcCol == null ) { + LOG.debug("Couldn't find input Column {}", cLI.inputColumn); + continue; + } + inputRefs.add(srcCol); + } + + if (inputRefs.size() > 0 ) { + Referenceable r = new Referenceable(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName()); + r.set("name", processRefObj.get(AtlasClient.NAME) + ":" + outRef.get(0).get(AtlasClient.NAME)); + r.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processRefObj.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME) + ":" + outRef.get(0).get(AtlasClient.NAME)); + r.set("inputs", inputRefs); + r.set("outputs", outRef); + r.set("query", processRefObj); + r.set("depenendencyType", e.getValue().get(0).depenendencyType); + r.set("expression", e.getValue().get(0).expr); + l.add(r); + } + else{ + LOG.debug("No input references found for lineage of column {}", destCol.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)); + } + } + + return l; + } + @VisibleForTesting static String getProcessQualifiedName(HiveMetaStoreBridge dgiBridge, HiveEventContext eventContext, final SortedSet<ReadEntity> sortedHiveInputs, @@ -930,6 +993,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private String queryStr; private Long queryStartTime; + public Map<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> lineageInfo; + private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>(); private String queryType; @@ -978,6 +1043,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { this.queryType = queryType; } + public void setLineageInfo(LineageInfo lineageInfo){ + try { + this.lineageInfo = ColumnLineageUtils.buildLineageMap(lineageInfo); + LOG.debug("Column Lineage Map => {} ", this.lineageInfo.entrySet()); + }catch (Exception e){ + LOG.warn("Column Lineage Map build failed with exception {}", e); + } + } + public Set<ReadEntity> getInputs() { return inputs; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java index 45f0bc9..28078f4 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java @@ -20,7 +20,6 @@ package org.apache.atlas.hive.model; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; - import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasException; @@ -107,6 +106,7 @@ public class HiveDataModelGenerator { // DDL/DML Process createProcessClass(); + createColumnLineageClass(); } public TypesDef getTypesDef() { @@ -328,4 +328,23 @@ public class HiveDataModelGenerator { } } + private void createColumnLineageClass() throws AtlasException { + + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition("query", HiveDataTypes.HIVE_PROCESS.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("depenendencyType",DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("expression",DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null) + }; + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), null, + ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_COLUMN_LINEAGE.getName()); + + } + } + http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java index e094cb6..94010d0 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java @@ -42,7 +42,8 @@ public enum HiveDataTypes { HIVE_INDEX, HIVE_ROLE, HIVE_TYPE, - HIVE_PROCESS + HIVE_PROCESS, + HIVE_COLUMN_LINEAGE // HIVE_VIEW, ; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index a5838b4..b6f55a1 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; +import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; @@ -54,18 +55,7 @@ import org.testng.annotations.Test; import java.io.File; import java.text.ParseException; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; +import java.util.*; import static org.apache.atlas.AtlasClient.NAME; import static org.apache.atlas.hive.hook.HiveHook.IO_SEP; @@ -320,6 +310,7 @@ public class HiveHookIT extends HiveITBase { assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, readEntities, writeEntities)); assertTableIsRegistered(DEFAULT_DB, ctasTableName); + } private HiveHook.HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) { @@ -1116,6 +1107,83 @@ public class HiveHookIT extends HiveITBase { ); } + /* + The test is disabled by default + Reason : Atlas uses Hive version 1.2.x and the Hive patch HIVE-13112 which enables column level lineage is not + committed in Hive version 1.2.x + This test will fail if the lineage information is not available from Hive + Once the patch for HIVE-13112 is committed to Hive branch 1.2.x, the test can be enabled + Please track HIVE-14706 to know the status of column lineage availability in latest Hive versions i.e 2.1.x + */ + @Test(enabled = false) + public void testColumnLevelLineage() throws Exception { + String sourceTable = "table" + random(); + runCommand("create table " + sourceTable + "(a int, b int)"); + String sourceTableGUID = assertTableIsRegistered(DEFAULT_DB, sourceTable); + String a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "a")); + String b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "b")); + + String ctasTableName = "table" + random(); + String query = "create table " + ctasTableName + " as " + + "select sum(a+b) as a, count(*) as b from " + sourceTable; + runCommand(query); + + String dest_a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "a")); + String dest_b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "b")); + + final Set<ReadEntity> inputs = getInputs(sourceTable, Entity.Type.TABLE); + final Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE); + HiveHook.HiveEventContext event = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs); + assertProcessIsRegistered(event); + assertTableIsRegistered(DEFAULT_DB, ctasTableName); + + String processQName = sortEventsAndGetProcessQualifiedName(event); + + List<String> aLineageInputs = Arrays.asList(a_guid, b_guid); + String aLineageProcessName = processQName + ":" + "a"; + LOG.debug("Searching for column lineage process {} ", aLineageProcessName); + String guid = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, aLineageProcessName, null); + List<Id> processInputs = (List<Id>) atlasClient.getEntity(guid).get("inputs"); + List<String> processInputsAsString = new ArrayList<>(); + for(Id input: processInputs){ + processInputsAsString.add(input._getId()); + } + Collections.sort(processInputsAsString); + Collections.sort(aLineageInputs); + Assert.assertEquals(processInputsAsString, aLineageInputs); + + List<String> bLineageInputs = Arrays.asList(sourceTableGUID); + String bLineageProcessName = processQName + ":" + "b"; + LOG.debug("Searching for column lineage process {} ", bLineageProcessName); + String guid1 = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, bLineageProcessName, null); + List<Id> bProcessInputs = (List<Id>) atlasClient.getEntity(guid1).get("inputs"); + List<String> bProcessInputsAsString = new ArrayList<>(); + for(Id input: bProcessInputs){ + bProcessInputsAsString.add(input._getId()); + } + Collections.sort(bProcessInputsAsString); + Collections.sort(bLineageInputs); + Assert.assertEquals(bProcessInputsAsString, bLineageInputs); + + //Test lineage API response + JSONObject response = atlasClient.getInputGraphForEntity(dest_a_guid); + JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices"); + JSONObject dest_a_val = (JSONObject) vertices.get(dest_a_guid); + JSONObject src_a_val = (JSONObject) vertices.get(a_guid); + JSONObject src_b_val = (JSONObject) vertices.get(b_guid); + Assert.assertNotNull(dest_a_val); + Assert.assertNotNull(src_a_val); + Assert.assertNotNull(src_b_val); + + + JSONObject b_response = atlasClient.getInputGraphForEntity(dest_b_guid); + JSONObject b_vertices = b_response.getJSONObject("values").getJSONObject("vertices"); + JSONObject b_val = (JSONObject) b_vertices.get(dest_b_guid); + JSONObject src_tbl_val = (JSONObject) b_vertices.get(sourceTableGUID); + Assert.assertNotNull(b_val); + Assert.assertNotNull(src_tbl_val); + } + @Test public void testTruncateTable() throws Exception { String tableName = createTable(false); @@ -1620,19 +1688,22 @@ public class HiveHookIT extends HiveITBase { } } - private String assertProcessIsRegistered(final HiveHook.HiveEventContext event) throws Exception { - try { - SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator); - SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator); + private String sortEventsAndGetProcessQualifiedName(final HiveHook.HiveEventContext event) throws HiveException{ + SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator); + SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator); - if ( event.getInputs() != null) { - sortedHiveInputs.addAll(event.getInputs()); - } - if ( event.getOutputs() != null) { - sortedHiveOutputs.addAll(event.getOutputs()); - } + if ( event.getInputs() != null) { + sortedHiveInputs.addAll(event.getInputs()); + } + if ( event.getOutputs() != null) { + sortedHiveOutputs.addAll(event.getOutputs()); + } + return getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs())); + } - String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs())); + private String assertProcessIsRegistered(final HiveHook.HiveEventContext event) throws Exception { + try { + String processQFName = sortEventsAndGetProcessQualifiedName(event); LOG.debug("Searching for process with query {}", processQFName); return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() { @Override http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 1a691ca..9e185c7 100644 --- a/release-log.txt +++ b/release-log.txt @@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ALL CHANGES: +ATLAS-247 Hive Column level lineage (rhbutani,svimal2106 via shwethags) ATLAS-1184 ReservedTypesRegistrar checks for existence of 1st class type (svimal2106 via shwethags) ATLAS-1199 Atlas UI not loading after fresh build due to jquery-asBreadcrumbs plugin upgrade (kevalbhatt via shwethags) ATLAS-1174 Framework to apply updates to types in the type-system ([email protected] via shwethags)
