Repository: incubator-atlas Updated Branches: refs/heads/master b6a0eee7f -> 9e1f36637
ATLAS-619 Canonicalize hive queries (sumasai) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/9e1f3663 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/9e1f3663 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/9e1f3663 Branch: refs/heads/master Commit: 9e1f366374827d5471a84c9ece438e89e814b7f8 Parents: b6a0eee Author: Suma Shivaprasad <[email protected]> Authored: Fri May 13 09:41:14 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Fri May 13 09:41:14 2016 -0700 ---------------------------------------------------------------------- .../apache/atlas/falcon/hook/FalconHook.java | 6 +- .../falcon/model/FalconDataModelGenerator.java | 3 - .../apache/atlas/falcon/hook/FalconHookIT.java | 8 +- addons/hive-bridge/pom.xml | 7 + .../org/apache/atlas/hive/hook/HiveHook.java | 294 +++++++++++-------- .../hive/model/HiveDataModelGenerator.java | 2 + .../apache/atlas/hive/rewrite/ASTRewriter.java | 26 ++ .../atlas/hive/rewrite/HiveASTRewriter.java | 95 ++++++ .../atlas/hive/rewrite/LiteralRewriter.java | 76 +++++ .../atlas/hive/rewrite/RewriteContext.java | 48 +++ .../atlas/hive/rewrite/RewriteException.java | 26 ++ .../hive/bridge/HiveLiteralRewriterTest.java | 67 +++++ .../org/apache/atlas/hive/hook/HiveHookIT.java | 219 +++++++++----- .../src/test/resources/hive-site.xml | 10 + .../org/apache/atlas/sqoop/hook/SqoopHook.java | 4 +- .../apache/atlas/sqoop/hook/SqoopHookIT.java | 4 +- .../apache/atlas/storm/hook/StormAtlasHook.java | 3 +- .../atlas/storm/model/StormDataModel.scala | 3 +- release-log.txt | 1 + .../atlas/services/DefaultMetadataService.java | 19 +- .../apache/atlas/BaseHiveRepositoryTest.java | 3 +- .../org/apache/atlas/examples/QuickStart.java | 3 +- .../org/apache/atlas/examples/QuickStartIT.java | 12 +- .../resources/HiveLineageJerseyResourceIT.java | 6 +- 24 files changed, 719 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index 8fced05..97ee1a2 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -211,10 +211,10 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { if (!inputs.isEmpty() || !outputs.isEmpty()) { Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); - processEntity.set(FalconDataModelGenerator.NAME, String.format("%s@%s", process.getName(), + processEntity.set(FalconDataModelGenerator.NAME, String.format("%s", process.getName(), cluster.getName())); - processEntity.set(FalconDataModelGenerator.PROCESS_NAME, process.getName()); - + processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s@%s", process.getName(), + cluster.getName())); processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); if (!inputs.isEmpty()) { processEntity.set(FalconDataModelGenerator.INPUTS, inputs); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java index 2494675..397dea4 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java @@ -57,7 +57,6 @@ public class FalconDataModelGenerator { private final Map<String, StructTypeDefinition> structTypeDefinitionMap; public static final String NAME = "name"; - public static final String PROCESS_NAME = "processName"; public static final String TIMESTAMP = "timestamp"; public static final String USER = "owned-by"; public static final String TAGS = "tag-classification"; @@ -107,8 +106,6 @@ public class FalconDataModelGenerator { private void createProcessEntityClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(PROCESS_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, - null), new AttributeDefinition(TIMESTAMP, DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false, null), new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java index 4e2a06f..9b356a2 100644 --- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java +++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java @@ -150,7 +150,7 @@ public class FalconHookIT { String pid = assertProcessIsRegistered(cluster.getName(), process.getName()); Referenceable processEntity = atlasClient.getEntity(pid); assertNotNull(processEntity); - assertEquals(processEntity.get("processName"), process.getName()); + assertEquals(processEntity.get(AtlasClient.NAME), process.getName()); Id inId = (Id) ((List)processEntity.get("inputs")).get(0); Referenceable inEntity = atlasClient.getEntity(inId._getId()); @@ -207,7 +207,7 @@ public class FalconHookIT { String pid = assertProcessIsRegistered(cluster.getName(), process.getName()); Referenceable processEntity = atlasClient.getEntity(pid); - assertEquals(processEntity.get("processName"), process.getName()); + assertEquals(processEntity.get(AtlasClient.NAME), process.getName()); assertNull(processEntity.get("inputs")); Id outId = (Id) ((List)processEntity.get("outputs")).get(0); @@ -233,8 +233,8 @@ public class FalconHookIT { private String assertProcessIsRegistered(String clusterName, String processName) throws Exception { String name = processName + "@" + clusterName; LOG.debug("Searching for process {}", name); - String query = String.format("%s as t where name = '%s' select t", - FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), name); + String query = String.format("%s as t where %s = '%s' select t", + FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); return assertEntityIsRegistered(query); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index eeb2aa4..47e72e8 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -72,6 +72,13 @@ <dependency> <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <version>${hive.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> <artifactId>hive-cli</artifactId> <version>${hive.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 5a1a36e..418e755 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -19,11 +19,14 @@ package org.apache.atlas.hive.hook; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasConstants; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.hive.rewrite.HiveASTRewriter; import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; @@ -92,110 +95,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final long keepAliveTimeDefault = 10; private static final int queueSizeDefault = 10000; - static class HiveEventContext { - private Set<ReadEntity> inputs; - private Set<WriteEntity> outputs; - - private String user; - private UserGroupInformation ugi; - private HiveOperation operation; - private HookContext.HookType hookType; - private org.json.JSONObject jsonPlan; - private String queryId; - private String queryStr; - private Long queryStartTime; - - private String queryType; - - public void setInputs(Set<ReadEntity> inputs) { - this.inputs = inputs; - } - - public void setOutputs(Set<WriteEntity> outputs) { - this.outputs = outputs; - } - - public void setUser(String user) { - this.user = user; - } - - public void setUgi(UserGroupInformation ugi) { - this.ugi = ugi; - } - - public void setOperation(HiveOperation operation) { - this.operation = operation; - } - - public void setHookType(HookContext.HookType hookType) { - this.hookType = hookType; - } - - public void setJsonPlan(JSONObject jsonPlan) { - this.jsonPlan = jsonPlan; - } - - public void setQueryId(String queryId) { - this.queryId = queryId; - } - - public void setQueryStr(String queryStr) { - this.queryStr = queryStr; - } - - public void setQueryStartTime(Long queryStartTime) { - this.queryStartTime = queryStartTime; - } - - public void setQueryType(String queryType) { - this.queryType = queryType; - } - - public Set<ReadEntity> getInputs() { - return inputs; - } - - public Set<WriteEntity> getOutputs() { - return outputs; - } - - public String getUser() { - return user; - } - - public UserGroupInformation getUgi() { - return ugi; - } - - public HiveOperation getOperation() { - return operation; - } - - public HookContext.HookType getHookType() { - return hookType; - } - - public org.json.JSONObject getJsonPlan() { - return jsonPlan; - } - - public String getQueryId() { - return queryId; - } - - public String getQueryStr() { - return queryStr; - } - - public Long getQueryStartTime() { - return queryStartTime; - } - - public String getQueryType() { - return queryType; - } - } - private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>(); private static final HiveConf hiveConf; @@ -362,7 +261,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } private void deleteTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) { - for (WriteEntity output : event.outputs) { + for (WriteEntity output : event.getOutputs()) { if (Type.TABLE.equals(output.getType())) { deleteTable(dgiBridge, event, output); } @@ -380,11 +279,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } private void deleteDatabase(HiveMetaStoreBridge dgiBridge, HiveEventContext event) { - if (event.outputs.size() > 1) { - LOG.info("Starting deletion of tables and databases with cascade {} " , event.queryStr); + if (event.getOutputs().size() > 1) { + LOG.info("Starting deletion of tables and databases with cascade {} " , event.getQueryStr()); } - for (WriteEntity output : event.outputs) { + for (WriteEntity output : event.getOutputs()) { if (Type.TABLE.equals(output.getType())) { deleteTable(dgiBridge, event, output); } else if (Type.DATABASE.equals(output.getType())) { @@ -552,13 +451,28 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return entitiesCreatedOrUpdated; } - public static String normalize(String str) { + public static String lower(String str) { if (StringUtils.isEmpty(str)) { return null; } return str.toLowerCase().trim(); } + public static String normalize(String queryStr) { + String result = null; + if (queryStr != null) { + try { + HiveASTRewriter rewriter = new HiveASTRewriter(hiveConf); + result = rewriter.rewrite(queryStr); + } catch (Exception e) { + LOG.warn("Could not rewrite query due to error. Proceeding with original query {}", queryStr, e); + } + } + + result = lower(result); + return result; + } + private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception { Set<ReadEntity> inputs = event.getInputs(); Set<WriteEntity> outputs = event.getOutputs(); @@ -589,7 +503,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } if (source.size() > 0 || target.size() > 0) { - Referenceable processReferenceable = getProcessReferenceable(event, + Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, new ArrayList<Referenceable>() {{ addAll(source.values()); }}, @@ -613,7 +527,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { dataSets.put(tblQFName, inTable); } } else if (entity.getType() == Type.DFS_DIR) { - final String pathUri = normalize(new Path(entity.getLocation()).toString()); + final String pathUri = lower(new Path(entity.getLocation()).toString()); LOG.info("Registering DFS Path {} ", pathUri); Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri); dataSets.put(pathUri, hdfsPath); @@ -657,7 +571,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { //Refresh to get the correct location hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName()); - final String location = normalize(hiveTable.getDataLocation().toString()); + final String location = lower(hiveTable.getDataLocation().toString()); if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) { LOG.info("Registering external table process {} ", event.getQueryStr()); List<Referenceable> inputs = new ArrayList<Referenceable>() {{ @@ -668,15 +582,33 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { add(tblRef); }}; - Referenceable processReferenceable = getProcessReferenceable(event, inputs, outputs); + Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs); messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable)); } } - private Referenceable getProcessReferenceable(HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) { + private boolean isCreateOp(HiveEventContext hiveEvent) { + if (HiveOperation.CREATETABLE.equals(hiveEvent.getOperation()) + || HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation()) + || HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation()) + || HiveOperation.CREATETABLE_AS_SELECT.equals(hiveEvent.getOperation())) { + return true; + } + return false; + } + + private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) { Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); - String queryStr = normalize(hiveEvent.getQueryStr()); + String queryStr = hiveEvent.getQueryStr(); + if (!isCreateOp(hiveEvent)) { + queryStr = normalize(queryStr); + processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(queryStr, sourceList, targetList)); + } else { + queryStr = lower(queryStr); + processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, queryStr); + } + LOG.debug("Registering query: {}", queryStr); //The serialization code expected a list @@ -686,15 +618,145 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { if (targetList != null || !targetList.isEmpty()) { processReferenceable.set("outputs", targetList); } - processReferenceable.set("name", queryStr); + processReferenceable.set(AtlasClient.NAME, queryStr); + processReferenceable.set("operationType", hiveEvent.getOperation().getOperationName()); processReferenceable.set("startTime", new Date(hiveEvent.getQueryStartTime())); processReferenceable.set("userName", hiveEvent.getUser()); processReferenceable.set("queryText", queryStr); processReferenceable.set("queryId", hiveEvent.getQueryId()); processReferenceable.set("queryPlan", hiveEvent.getJsonPlan()); + processReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, dgiBridge.getClusterName()); + + List<String> recentQueries = new ArrayList<>(1); + recentQueries.add(hiveEvent.getQueryStr()); + processReferenceable.set("recentQueries", recentQueries); processReferenceable.set("endTime", new Date(System.currentTimeMillis())); //TODO set queryGraph return processReferenceable; } + + @VisibleForTesting + static String getProcessQualifiedName(String normalizedQuery, List<Referenceable> inputs, List<Referenceable> outputs) { + StringBuilder buffer = new StringBuilder(normalizedQuery); + addDatasets(buffer, inputs); + addDatasets(buffer, outputs); + return buffer.toString(); + } + + private static void addDatasets(StringBuilder buffer, List<Referenceable> refs) { + if (refs != null) { + for (Referenceable input : refs) { + //TODO - Change to qualifiedName later + buffer.append(":"); + String dataSetQlfdName = (String) input.get(AtlasClient.NAME); + buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", "")); + } + } + } + + public static class HiveEventContext { + private Set<ReadEntity> inputs; + private Set<WriteEntity> outputs; + + private String user; + private UserGroupInformation ugi; + private HiveOperation operation; + private HookContext.HookType hookType; + private JSONObject jsonPlan; + private String queryId; + private String queryStr; + private Long queryStartTime; + + private String queryType; + + public void setInputs(Set<ReadEntity> inputs) { + this.inputs = inputs; + } + + public void setOutputs(Set<WriteEntity> outputs) { + this.outputs = outputs; + } + + public void setUser(String user) { + this.user = user; + } + + public void setUgi(UserGroupInformation ugi) { + this.ugi = ugi; + } + + public void setOperation(HiveOperation operation) { + this.operation = operation; + } + + public void setHookType(HookContext.HookType hookType) { + this.hookType = hookType; + } + + public void setJsonPlan(JSONObject jsonPlan) { + this.jsonPlan = jsonPlan; + } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + public void setQueryStr(String queryStr) { + this.queryStr = queryStr; + } + + public void setQueryStartTime(Long queryStartTime) { + this.queryStartTime = queryStartTime; + } + + public void setQueryType(String queryType) { + this.queryType = queryType; + } + + public Set<ReadEntity> getInputs() { + return inputs; + } + + public Set<WriteEntity> getOutputs() { + return outputs; + } + + public String getUser() { + return user; + } + + public UserGroupInformation getUgi() { + return ugi; + } + + public HiveOperation getOperation() { + return operation; + } + + public HookContext.HookType getHookType() { + return hookType; + } + + public JSONObject getJsonPlan() { + return jsonPlan; + } + + public String getQueryId() { + return queryId; + } + + public String getQueryStr() { + return queryStr; + } + + public Long getQueryStartTime() { + return queryStartTime; + } + + public String getQueryType() { + return queryType; + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java index 7cbb1df..347405c 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java @@ -288,6 +288,8 @@ public class HiveDataModelGenerator { new AttributeDefinition("queryPlan", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), new AttributeDefinition("queryId", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), + new AttributeDefinition("recentQueries", String.format("array<%s>", DataTypes.STRING_TYPE.getName()), Multiplicity.OPTIONAL, false, null), + new AttributeDefinition(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), new AttributeDefinition("queryGraph", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),}; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java new file mode 100644 index 0000000..3a2506b --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.hive.rewrite; + + +import org.apache.hadoop.hive.ql.parse.ASTNode; + +public interface ASTRewriter { + + void rewrite(RewriteContext ctx, ASTNode node) throws RewriteException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java new file mode 100644 index 0000000..4cd219f --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.hive.rewrite; + + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.ParseDriver; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.parse.ParseException; +import org.apache.hadoop.hive.ql.parse.ParseUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class HiveASTRewriter { + + private Context queryContext; + private RewriteContext rwCtx; + private List<ASTRewriter> rewriters = new ArrayList<>(); + + private static final Logger LOG = LoggerFactory.getLogger(HiveASTRewriter.class); + + public HiveASTRewriter(HiveConf conf) throws RewriteException { + try { + queryContext = new Context(conf); + setUpRewriters(); + } catch (IOException e) { + throw new RewriteException("Exception while rewriting query : " , e); + } + } + + private void setUpRewriters() throws RewriteException { + ASTRewriter rewriter = new LiteralRewriter(); + rewriters.add(rewriter); + } + + public String rewrite(String sourceQry) throws RewriteException { + String result = sourceQry; + ASTNode tree = null; + try { + ParseDriver pd = new ParseDriver(); + tree = pd.parse(sourceQry, queryContext, true); + tree = ParseUtils.findRootNonNullToken(tree); + this.rwCtx = new RewriteContext(sourceQry, tree, queryContext.getTokenRewriteStream()); + rewrite(tree); + result = toSQL(); + } catch (ParseException e) { + LOG.error("Could not parse the query {} ", sourceQry, e); + throw new RewriteException("Could not parse query : " , e); + } + return result; + } + + private void rewrite(ASTNode origin) throws RewriteException { + ASTNode node = origin; + if (node != null) { + for(ASTRewriter rewriter : rewriters) { + rewriter.rewrite(rwCtx, node); + } + if (node.getChildren() != null) { + for (int i = 0; i < node.getChildren().size(); i++) { + rewrite((ASTNode) node.getChild(i)); + } + } + } + } + + public String toSQL() { + return rwCtx.getTokenRewriteStream().toString(); + } + + public String printAST() { + return rwCtx.getOriginNode().dump(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java new file mode 100644 index 0000000..789b981 --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.hive.rewrite; + +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveParser; + +import java.util.HashMap; +import java.util.Map; + +public class LiteralRewriter implements ASTRewriter { + + public static Map<Integer, String> LITERAL_TOKENS = new HashMap<Integer, String>() {{ + put(HiveParser.Number, "NUMBER_LITERAL"); + put(HiveParser.Digit, "DIGIT_LITERAL"); + put(HiveParser.HexDigit, "HEX_LITERAL"); + put(HiveParser.Exponent, "EXPONENT_LITERAL"); + put(HiveParser.StringLiteral, "'STRING_LITERAL'"); + put(HiveParser.BigintLiteral, "BIGINT_LITERAL"); + put(HiveParser.SmallintLiteral, "SMALLINT_LITERAL"); + put(HiveParser.TinyintLiteral, "TINYINT_LITERAL"); + put(HiveParser.DecimalLiteral, "DECIMAL_LITERAL"); + put(HiveParser.ByteLengthLiteral, "BYTE_LENGTH_LITERAL"); + put(HiveParser.TOK_STRINGLITERALSEQUENCE, "'STRING_LITERAL_SEQ'"); + put(HiveParser.TOK_CHARSETLITERAL, "'CHARSET_LITERAL'"); + put(HiveParser.KW_TRUE, "BOOLEAN_LITERAL"); + put(HiveParser.KW_FALSE, "BOOLEAN_LITERAL"); + }}; + + + @Override + public void rewrite(RewriteContext ctx, final ASTNode node) throws RewriteException { + try { + processLiterals(ctx, node); + } catch(Exception e) { + throw new RewriteException("Could not normalize query", e); + } + } + + + private void processLiterals(final RewriteContext ctx, final ASTNode node) { + // Take child ident.totext + if (isLiteral(node)) { + replaceLiteral(ctx, node); + } + } + + private boolean isLiteral(ASTNode node) { + if (LITERAL_TOKENS.containsKey(node.getType())) { + return true; + } + return false; + } + + void replaceLiteral(RewriteContext ctx, ASTNode valueNode) { + //Reset the token stream + String literalVal = LITERAL_TOKENS.get(valueNode.getType()); + ctx.getTokenRewriteStream().replace(valueNode.getTokenStartIndex(), + valueNode.getTokenStopIndex(), literalVal); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java new file mode 100644 index 0000000..505616e --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.hive.rewrite; + +import org.antlr.runtime.TokenRewriteStream; +import org.apache.hadoop.hive.ql.parse.ASTNode; + +public class RewriteContext { + + private String origQuery; + + private TokenRewriteStream rewriteStream; + + private ASTNode origin; + + RewriteContext(String origQuery, ASTNode origin, TokenRewriteStream rewriteStream) { + this.origin = origin; + this.rewriteStream = rewriteStream; + } + + public TokenRewriteStream getTokenRewriteStream() { + return rewriteStream; + } + + public ASTNode getOriginNode() { + return origin; + } + + public String getOriginalQuery() { + return origQuery; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java new file mode 100644 index 0000000..79a1afe --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.hive.rewrite; + +import org.apache.hadoop.hive.ql.parse.ParseException; + +public class RewriteException extends Exception { + public RewriteException(final String message, final Exception exception) { + super(message, exception); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java new file mode 100644 index 0000000..2840457 --- /dev/null +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.hive.bridge; + +import org.apache.atlas.hive.hook.HiveHook; +import org.apache.atlas.hive.rewrite.HiveASTRewriter; +import org.apache.atlas.hive.rewrite.RewriteException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class HiveLiteralRewriterTest { + + private HiveConf conf; + + @BeforeClass + public void setup() { + conf = new HiveConf(); + conf.addResource("/hive-site.xml"); + SessionState ss = new SessionState(conf, "testuser"); + SessionState.start(ss); + conf.set("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"); + } + + @Test + public void testLiteralRewrite() throws RewriteException { + HiveHook.HiveEventContext ctx = new HiveHook.HiveEventContext(); + ctx.setQueryStr("insert into table testTable partition(dt='2014-01-01') select * from test1 where dt = '2014-01-01'" + + " and intColumn = 10" + + " and decimalColumn = 1.10" + + " and charColumn = 'a'" + + " and hexColumn = unhex('\\0xFF')" + + " and expColumn = cast('-1.5e2' as int)" + + " and boolCol = true"); + + HiveASTRewriter queryRewriter = new HiveASTRewriter(conf); + String result = queryRewriter.rewrite(ctx.getQueryStr()); + System.out.println("normlized sql : " + result); + + final String normalizedSQL = "insert into table testTable partition(dt='STRING_LITERAL') " + + "select * from test1 where dt = 'STRING_LITERAL' " + + "and intColumn = NUMBER_LITERAL " + + "and decimalColumn = NUMBER_LITERAL and " + + "charColumn = 'STRING_LITERAL' and " + + "hexColumn = unhex('STRING_LITERAL') and " + + "expColumn = cast('STRING_LITERAL' as int) and " + + "boolCol = BOOLEAN_LITERAL"; + Assert.assertEquals(result, normalizedSQL); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index 43bba0e..70100f1 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -28,6 +28,8 @@ import org.apache.atlas.fs.model.FSDataTypes; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.hive.rewrite.HiveASTRewriter; +import org.apache.atlas.hive.rewrite.RewriteException; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.persistence.Id; @@ -56,11 +58,13 @@ import org.testng.annotations.Test; import java.io.File; import java.text.ParseException; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.atlas.hive.hook.HiveHook.lower; import static org.apache.atlas.hive.hook.HiveHook.normalize; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -76,6 +80,8 @@ public class HiveHookIT { private AtlasClient atlasClient; private HiveMetaStoreBridge hiveMetaStoreBridge; private SessionState ss; + + private HiveConf conf; private static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS; private static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS; @@ -83,10 +89,7 @@ public class HiveHookIT { @BeforeClass public void setUp() throws Exception { //Set-up hive session - HiveConf conf = new HiveConf(); - //Run in local mode - conf.set("mapreduce.framework.name", "local"); - conf.set("fs.default.name", "file:///'"); + conf = new HiveConf(); conf.setClassLoader(Thread.currentThread().getContextClassLoader()); driver = new Driver(conf); ss = new SessionState(conf); @@ -98,7 +101,6 @@ public class HiveHookIT { hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, atlasClient); hiveMetaStoreBridge.registerHiveDataModel(); - } private void runCommand(String cmd) throws Exception { @@ -231,36 +233,36 @@ public class HiveHookIT { @Test public void testCreateExternalTable() throws Exception { String tableName = tableName(); - String dbName = createDatabase(); String colName = columnName(); String pFile = createTestDFSPath("parentPath"); - final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", dbName , tableName , colName + " int", "name string", pFile); + final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile); runCommand(query); - String tableId = assertTableIsRegistered(dbName, tableName, null, true); + assertTableIsRegistered(DEFAULT_DB, tableName, null, true); - Referenceable processReference = validateProcess(query, 1, 1); + String processId = assertProcessIsRegistered(query); + Referenceable processReference = atlasClient.getEntity(processId); assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName()); verifyTimestamps(processReference, "startTime"); verifyTimestamps(processReference, "endTime"); validateHDFSPaths(processReference, pFile, INPUTS); - validateOutputTables(processReference, tableId); } - private void validateOutputTables(Referenceable processReference, String... expectedTableGuids) throws Exception { - validateTables(processReference, OUTPUTS, expectedTableGuids); + private void validateOutputTables(Referenceable processReference, String... expectedTableNames) throws Exception { + validateTables(processReference, OUTPUTS, expectedTableNames); } - private void validateInputTables(Referenceable processReference, String... expectedTableGuids) throws Exception { - validateTables(processReference, INPUTS, expectedTableGuids); + private void validateInputTables(Referenceable processReference, String... expectedTableNames) throws Exception { + validateTables(processReference, INPUTS, expectedTableNames); } - private void validateTables(Referenceable processReference, String attrName, String... expectedTableGuids) throws Exception { + private void validateTables(Referenceable processReference, String attrName, String... expectedTableNames) throws Exception { List<Id> tableRef = (List<Id>) processReference.get(attrName); - for(int i = 0; i < expectedTableGuids.length; i++) { - Assert.assertEquals(tableRef.get(i)._getId(), expectedTableGuids[i]); + for(int i = 0; i < expectedTableNames.length; i++) { + Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId()); + Assert.assertEquals(entity.get(AtlasClient.NAME), expectedTableNames[i]); } } @@ -371,7 +373,7 @@ public class HiveHookIT { String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName; runCommand(query); - assertProcessIsRegistered(query); + assertProcessIsRegistered(query, null, getQualifiedTblName(tableName)); } @Test @@ -382,7 +384,7 @@ public class HiveHookIT { String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')"; runCommand(query); - validateProcess(query, 0, 1); + validateProcess(query, null, getQualifiedTblName(tableName)); } @Test @@ -392,49 +394,42 @@ public class HiveHookIT { String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); String loadFile = createTestDFSFile("loadDFSFile"); + final String testPathNormed = lower(new Path(loadFile).toString()); String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')"; runCommand(query); - Referenceable processReference = validateProcess(query, 1, 1); + final String tblQlfdName = getQualifiedTblName(tableName); + Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName); validateHDFSPaths(processReference, loadFile, INPUTS); - validateOutputTables(processReference, tableId); + validateOutputTables(processReference, tblQlfdName); } - private Referenceable validateProcess(String query, int numInputs, int numOutputs) throws Exception { - String processId = assertProcessIsRegistered(query); - Referenceable process = atlasClient.getEntity(processId); - if (numInputs == 0) { - Assert.assertNull(process.get(INPUTS)); - } else { - Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), numInputs); - } + private String getQualifiedTblName(String inputTable) { + String inputtblQlfdName = inputTable; - if (numOutputs == 0) { - Assert.assertNull(process.get(OUTPUTS)); - } else { - Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), numOutputs); + if (inputTable != null && !inputTable.contains(".")) { + inputtblQlfdName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, inputTable); } - - return process; + return inputtblQlfdName; } - private Referenceable validateProcess(String query, String[] inputs, String[] outputs) throws Exception { - String processId = assertProcessIsRegistered(query); + private Referenceable validateProcess(String query, String inputTable, String outputTable) throws Exception { + String processId = assertProcessIsRegistered(query, inputTable, outputTable); Referenceable process = atlasClient.getEntity(processId); - if (inputs == null) { + if (inputTable == null) { Assert.assertNull(process.get(INPUTS)); } else { - Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), inputs.length); - validateInputTables(process, inputs); + Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), 1); + validateInputTables(process, inputTable); } - if (outputs == null) { + if (outputTable == null) { Assert.assertNull(process.get(OUTPUTS)); } else { - Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), outputs.length); - validateOutputTables(process, outputs); + Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1 ); + validateOutputTables(process, outputTable); } return process; @@ -452,7 +447,14 @@ public class HiveHookIT { String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName); String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName); - validateProcess(query, new String[]{inputTableId}, new String[]{opTableId}); + Referenceable processRef1 = validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName)); + + //Rerun same query. Should result in same process + runCommand(query); + + Referenceable processRef2 = validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName)); + Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId()); + } @Test @@ -463,7 +465,7 @@ public class HiveHookIT { "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName; runCommand(query); - validateProcess(query, 1, 0); + validateProcess(query, getQualifiedTblName(tableName), null); assertTableIsRegistered(DEFAULT_DB, tableName); } @@ -471,17 +473,33 @@ public class HiveHookIT { @Test public void testInsertIntoDFSDir() throws Exception { String tableName = createTable(); - String pFile = createTestDFSPath("somedfspath"); + String pFile1 = createTestDFSPath("somedfspath1"); + String testPathNormed = lower(new Path(pFile1).toString()); String query = - "insert overwrite DIRECTORY '" + pFile + "' select id, name from " + tableName; + "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName; runCommand(query); - Referenceable processReference = validateProcess(query, 1, 1); - validateHDFSPaths(processReference, pFile, OUTPUTS); + String tblQlfdname = getQualifiedTblName(tableName); + Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed); + validateHDFSPaths(processReference, pFile1, OUTPUTS); String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - validateInputTables(processReference, tableId); + validateInputTables(processReference, tblQlfdname); + + //Rerun same query with different HDFS path + + String pFile2 = createTestDFSPath("somedfspath2"); + testPathNormed = lower(new Path(pFile2).toString()); + query = + "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName; + + runCommand(query); + tblQlfdname = getQualifiedTblName(tableName); + Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed); + validateHDFSPaths(process2Reference, pFile2, OUTPUTS); + + Assert.assertNotEquals(process2Reference.getId()._getId(), processReference.getId()._getId()); } @Test @@ -495,11 +513,10 @@ public class HiveHookIT { "insert into " + insertTableName + " select id, name from " + tableName; runCommand(query); - validateProcess(query, 1, 1); + validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId())); - String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName); - String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true); - validateProcess(query, new String[] {ipTableId}, new String[] {opTableId}); + assertTableIsRegistered(DEFAULT_DB, tableName); + assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true); } @Test @@ -510,11 +527,10 @@ public class HiveHookIT { "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName + " where dt = '2015-01-01'"; runCommand(query); - validateProcess(query, 1, 1); + validateProcess(query, getQualifiedTblName(tableName) , getQualifiedTblName(insertTableName)); - String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName); - String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName); - validateProcess(query, new String[]{ipTableId}, new String[]{opTableId}); + assertTableIsRegistered(DEFAULT_DB, tableName); + assertTableIsRegistered(DEFAULT_DB, insertTableName); } private String random() { @@ -543,10 +559,12 @@ public class HiveHookIT { String filename = "pfile://" + mkdir("export"); String query = "export table " + tableName + " to \"" + filename + "\""; + final String testPathNormed = lower(new Path(filename).toString()); runCommand(query); - Referenceable processReference = validateProcess(query, 1, 1); + String tblQlfName = getQualifiedTblName(tableName); + Referenceable processReference = validateProcess(query, tblQlfName, testPathNormed); validateHDFSPaths(processReference, filename, OUTPUTS); - validateInputTables(processReference, tableId); + validateInputTables(processReference, tblQlfName); //Import tableName = createTable(false); @@ -554,10 +572,11 @@ public class HiveHookIT { query = "import table " + tableName + " from '" + filename + "'"; runCommand(query); - processReference = validateProcess(query, 1, 1); + tblQlfName = getQualifiedTblName(tableName); + processReference = validateProcess(query, testPathNormed, tblQlfName); validateHDFSPaths(processReference, filename, INPUTS); - validateOutputTables(processReference, tableId); + validateOutputTables(processReference, tblQlfName); } @Test @@ -571,12 +590,14 @@ public class HiveHookIT { runCommand(query); String filename = "pfile://" + mkdir("export"); + final String testPathNormed = lower(new Path(filename).toString()); query = "export table " + tableName + " to \"" + filename + "\""; runCommand(query); - Referenceable processReference = validateProcess(query, 1, 1); + String tblQlfdName = getQualifiedTblName(tableName); + Referenceable processReference = validateProcess(query, tblQlfdName, testPathNormed); validateHDFSPaths(processReference, filename, OUTPUTS); - validateInputTables(processReference, tableId); + validateInputTables(processReference, tblQlfdName); //Import tableName = createTable(true); @@ -584,10 +605,11 @@ public class HiveHookIT { query = "import table " + tableName + " from '" + filename + "'"; runCommand(query); - processReference = validateProcess(query, 1, 1); + tblQlfdName = getQualifiedTblName(tableName); + processReference = validateProcess(query, testPathNormed, tblQlfdName); validateHDFSPaths(processReference, filename, INPUTS); - validateOutputTables(processReference, tableId); + validateOutputTables(processReference, tblQlfdName); } @Test @@ -750,7 +772,7 @@ public class HiveHookIT { }); assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); //Change name and add comment oldColName = "name2"; @@ -847,8 +869,9 @@ public class HiveHookIT { String query = String.format("truncate table %s", tableName); runCommand(query); + String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - validateProcess(query, 0, 1); + validateProcess(query, null, getQualifiedTblName(tableName)); //Check lineage String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); @@ -916,16 +939,17 @@ public class HiveHookIT { } }); - Referenceable processReference = validateProcess(query, 1, 1); - validateHDFSPaths(processReference, testPath, INPUTS); + final String tblQlfdName = getQualifiedTblName(tableName); - validateOutputTables(processReference, tableId); + final String testPathNormed = lower(new Path(testPath).toString()); + Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName); + validateHDFSPaths(processReference, testPath, INPUTS); } private String validateHDFSPaths(Referenceable processReference, String testPath, String attributeName) throws Exception { List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName); - final String testPathNormed = normalize(new Path(testPath).toString()); + final String testPathNormed = lower(new Path(testPath).toString()); String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed); Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId); @@ -1083,7 +1107,7 @@ public class HiveHookIT { //Verify columns are not registered for one of the tables assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id")); + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id")); assertColumnIsNotRegistered(HiveMetaStoreBridge .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), HiveDataModelGenerator.NAME)); @@ -1316,14 +1340,55 @@ public class HiveHookIT { } } - private String assertProcessIsRegistered(String queryStr) throws Exception { - LOG.debug("Searching for process with query {}", queryStr); - return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.NAME, normalize(queryStr), null); + private String assertProcessIsRegistered(final String queryStr, final String inputTblName, final String outputTblName) throws Exception { + + HiveASTRewriter astRewriter = new HiveASTRewriter(conf); + String normalizedQuery = normalize(astRewriter.rewrite(queryStr)); + + List<Referenceable> inputs = null; + + if (inputTblName != null) { + Referenceable inputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{ + put(HiveDataModelGenerator.NAME, inputTblName); + }}); + inputs = new ArrayList<Referenceable>(); + inputs.add(inputTableRef); + } + List<Referenceable> outputs = null; + if (outputTblName != null) { + Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{ + put(HiveDataModelGenerator.NAME, outputTblName); + }}); + + outputs = new ArrayList<Referenceable>(); + outputs.add(outputTableRef); + } + String processQFName = HiveHook.getProcessQualifiedName(normalizedQuery, inputs, outputs); + LOG.debug("Searching for process with query {}", processQFName); + return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() { + @Override + public void assertOnEntity(final Referenceable entity) throws Exception { + List<String> recentQueries = (List<String>) entity.get("recentQueries"); + Assert.assertEquals(recentQueries.get(0), queryStr); + } + }); + } + + private String assertProcessIsRegistered(final String queryStr) throws Exception { + String lowerQryStr = lower(queryStr); + LOG.debug("Searching for process with query {}", lowerQryStr); + return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, lowerQryStr, new AssertPredicate() { + @Override + public void assertOnEntity(final Referenceable entity) throws Exception { + List<String> recentQueries = (List<String>) entity.get("recentQueries"); + Assert.assertEquals(recentQueries.get(0), queryStr); + } + }); } private void assertProcessIsNotRegistered(String queryStr) throws Exception { LOG.debug("Searching for process with query {}", queryStr); - assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.NAME, normalize(queryStr)); + assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, normalize(queryStr)); } private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/test/resources/hive-site.xml ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/resources/hive-site.xml b/addons/hive-bridge/src/test/resources/hive-site.xml index f1facb8..058e546 100644 --- a/addons/hive-bridge/src/test/resources/hive-site.xml +++ b/addons/hive-bridge/src/test/resources/hive-site.xml @@ -17,6 +17,16 @@ <configuration> <property> + <name>mapreduce.framework.name</name> + <value>local</value> + </property> + + <property> + <name>fs.default.name</name> + <value>file:///</value> + </property> + + <property> <name>hive.exec.post.hooks</name> <value>org.apache.atlas.hive.hook.HiveHook</value> </property> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java index ab7e6ee..18474ad 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -105,7 +105,9 @@ public class SqoopHook extends SqoopJobDataPublisher { private Referenceable createSqoopProcessInstance(Referenceable dbStoreRef, Referenceable hiveTableRef, SqoopJobDataPublisher.Data data, String clusterName) { Referenceable procRef = new Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName()); - procRef.set(SqoopDataModelGenerator.NAME, getSqoopProcessName(data, clusterName)); + final String sqoopProcessName = getSqoopProcessName(data, clusterName); + procRef.set(SqoopDataModelGenerator.NAME, sqoopProcessName); + procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName); procRef.set(SqoopDataModelGenerator.OPERATION, data.getOperation()); procRef.set(SqoopDataModelGenerator.INPUTS, dbStoreRef); procRef.set(SqoopDataModelGenerator.OUTPUTS, hiveTableRef); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java index 2820169..a81ee15 100644 --- a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java +++ b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java @@ -107,8 +107,8 @@ public class SqoopHookIT { private String assertSqoopProcessIsRegistered(String processName) throws Exception { LOG.debug("Searching for sqoop process {}", processName); String query = String.format( - "%s as t where name = '%s' select t", - SqoopDataTypes.SQOOP_PROCESS.getName(), processName); + "%s as t where %s = '%s' select t", + SqoopDataTypes.SQOOP_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processName); return assertEntityIsRegistered(query); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 267e228..4448105 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -110,7 +110,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { Referenceable topologyReferenceable = new Referenceable( StormDataTypes.STORM_TOPOLOGY.getName()); topologyReferenceable.set("id", topologyInfo.get_id()); - topologyReferenceable.set("name", topologyInfo.get_name()); + topologyReferenceable.set(AtlasClient.NAME, topologyInfo.get_name()); + topologyReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name()); String owner = topologyInfo.get_owner(); if (StringUtils.isEmpty(owner)) { owner = ANONYMOUS_OWNER; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala b/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala index de67c39..a982e61 100644 --- a/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala +++ b/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala @@ -18,6 +18,7 @@ package org.apache.atlas.storm.model +import org.apache.atlas.AtlasClient import org.apache.atlas.typesystem.TypesDef import org.apache.atlas.typesystem.builders.TypesBuilder import org.apache.atlas.typesystem.json.TypesSerialization @@ -42,7 +43,7 @@ object StormDataModel extends App { * Also, Topology contains the Graph of Nodes * Topology => Node(s) -> Spouts/Bolts */ - _class(StormDataTypes.STORM_TOPOLOGY.getName, List("Process")) { + _class(StormDataTypes.STORM_TOPOLOGY.getName, List(AtlasClient.PROCESS_SUPER_TYPE)) { "id" ~ (string, required, indexed, unique) "description" ~ (string, optional, indexed) "owner" ~ (string, required, indexed) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index fe79005..dcaeecd 100644 --- a/release-log.txt +++ b/release-log.txt @@ -3,6 +3,7 @@ Apache Atlas Release Notes --trunk - unreleased INCOMPATIBLE CHANGES: +ATLAS-619 Canonicalize hive queries (sumasai) ATLAS-497 Simple Authorization (saqeeb.s via yhemanth) ATLAS-661 REST API Authentication (nixonrodrigues via yhemanth) ATLAS-672 UI: Make dashboard v2 the default UI implementation (bergenholtz via yhemanth) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index 9f69940..5195cbe 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -182,20 +182,21 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang DESCRIPTION_ATTRIBUTE); createType(datasetType); - HierarchicalTypeDefinition<ClassType> processType = TypesUtil - .createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE, ImmutableSet.<String>of(), NAME_ATTRIBUTE, - DESCRIPTION_ATTRIBUTE, - new AttributeDefinition("inputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("outputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE), - Multiplicity.OPTIONAL, false, null)); - createType(processType); - HierarchicalTypeDefinition<ClassType> referenceableType = TypesUtil .createClassTypeDef(AtlasClient.REFERENCEABLE_SUPER_TYPE, ImmutableSet.<String>of(), TypesUtil.createUniqueRequiredAttrDef(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, DataTypes.STRING_TYPE)); createType(referenceableType); + + HierarchicalTypeDefinition<ClassType> processType = TypesUtil + .createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE, ImmutableSet.<String>of(AtlasClient.REFERENCEABLE_SUPER_TYPE), + TypesUtil.createRequiredAttrDef(AtlasClient.NAME, DataTypes.STRING_TYPE), + DESCRIPTION_ATTRIBUTE, + new AttributeDefinition("inputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("outputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE), + Multiplicity.OPTIONAL, false, null)); + createType(processType); } private void createType(HierarchicalTypeDefinition<ClassType> type) throws AtlasException { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java index 66e1365..40f0d91 100644 --- a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java @@ -331,7 +331,8 @@ public class BaseHiveRepositoryTest { String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); - referenceable.set("name", name); + referenceable.set(AtlasClient.NAME, name); + referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set("description", description); referenceable.set("user", user); referenceable.set("startTime", System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java b/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java index 70dce6b..79feb39 100755 --- a/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java +++ b/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java @@ -376,7 +376,8 @@ public class QuickStart { throws Exception { Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames); // super type attributes - referenceable.set("name", name); + referenceable.set(AtlasClient.NAME, name); + referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set("description", description); referenceable.set(INPUTS_ATTRIBUTE, inputTables); referenceable.set(OUTPUTS_ATTRIBUTE, outputTables); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java b/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java index cdf6049..2912464 100644 --- a/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java +++ b/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java @@ -18,6 +18,8 @@ package org.apache.atlas.examples; +import org.apache.atlas.Atlas; +import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.persistence.Id; @@ -94,10 +96,10 @@ public class QuickStartIT extends BaseResourceIT { @Test public void testProcessIsAdded() throws AtlasServiceException, JSONException { - Referenceable loadProcess = serviceClient.getEntity(QuickStart.LOAD_PROCESS_TYPE, "name", + Referenceable loadProcess = serviceClient.getEntity(QuickStart.LOAD_PROCESS_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, QuickStart.LOAD_SALES_DAILY_PROCESS); - assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS, loadProcess.get("name")); + assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS, loadProcess.get(AtlasClient.NAME)); assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS_DESCRIPTION, loadProcess.get("description")); List<Id> inputs = (List<Id>)loadProcess.get(QuickStart.INPUTS_ATTRIBUTE); @@ -141,12 +143,12 @@ public class QuickStartIT extends BaseResourceIT { @Test public void testViewIsAdded() throws AtlasServiceException, JSONException { - Referenceable view = serviceClient.getEntity(QuickStart.VIEW_TYPE, "name", QuickStart.PRODUCT_DIM_VIEW); + Referenceable view = serviceClient.getEntity(QuickStart.VIEW_TYPE, AtlasClient.NAME, QuickStart.PRODUCT_DIM_VIEW); - assertEquals(QuickStart.PRODUCT_DIM_VIEW, view.get("name")); + assertEquals(QuickStart.PRODUCT_DIM_VIEW, view.get(AtlasClient.NAME)); Id productDimId = getTable(QuickStart.PRODUCT_DIM_TABLE).getId(); - Id inputTableId = ((List<Id>)view.get(QuickStart.INPUT_TABLES_ATTRIBUTE)).get(0); + Id inputTableId = ((List<Id>) view.get(QuickStart.INPUT_TABLES_ATTRIBUTE)).get(0); assertEquals(productDimId, inputTableId); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java index bc02f90..0fb5ea2 100644 --- a/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java @@ -184,7 +184,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT { table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB, "Joe BI", "MANAGED", salesFactColumns, "Metric"); - loadProcess("loadSalesDaily" + randomString(), "John ETL", ImmutableList.of(salesFact, timeDim), + String procName = "loadSalesDaily" + randomString(); + loadProcess(procName, "John ETL", ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL"); salesMonthlyTable = "sales_fact_monthly_mv" + randomString(); @@ -237,7 +238,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT { Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); - referenceable.set("name", name); + referenceable.set(AtlasClient.NAME, name); + referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set("user", user); referenceable.set("startTime", System.currentTimeMillis()); referenceable.set("endTime", System.currentTimeMillis() + 10000);
