This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new f103e14 ATLAS-3056: updated rdbms types to remove use of ownedRef/inverseRef f103e14 is described below commit f103e1438bbfc0c61912ded3551e22578876fbbe Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Thu Feb 21 18:16:34 2019 -0800 ATLAS-3056: updated rdbms types to remove use of ownedRef/inverseRef --- addons/models/2000-RDBMS/2010-rdbms_model.json | 173 +++------------------ .../notification/NotificationHookConsumer.java | 28 +++- .../preprocessor/EntityPreprocessor.java | 39 ++++- .../preprocessor/RdbmsPreprocessor.java | 139 +++++++++++++++++ 4 files changed, 215 insertions(+), 164 deletions(-) diff --git a/addons/models/2000-RDBMS/2010-rdbms_model.json b/addons/models/2000-RDBMS/2010-rdbms_model.json index 386446c..d4e1805 100644 --- a/addons/models/2000-RDBMS/2010-rdbms_model.json +++ b/addons/models/2000-RDBMS/2010-rdbms_model.json @@ -21,7 +21,7 @@ { "name": "platform", "typeName": "string", - "isOptional": false, + "isOptional": true, "cardinality": "SINGLE", "isUnique": false, "isIndexable": true @@ -73,19 +73,6 @@ "cardinality": "SINGLE", "isUnique": false, "isIndexable": false - }, - { - "name": "databases", - "typeName": "array<rdbms_db>", - "isOptional": true, - "cardinality": "SET", - "isUnique": false, - "isIndexable": false, - "constraints": [ - { - "type": "ownedRef" - } - ] } ] }, @@ -99,26 +86,10 @@ { "name": "prodOrOther", "typeName": "string", - "isOptional": false, - "cardinality": "SINGLE", - "isUnique": false, - "isIndexable": true - }, - { - "name": "instance", - "typeName": "rdbms_instance", "isOptional": true, "cardinality": "SINGLE", "isUnique": false, - "isIndexable": false, - "constraints": [ - { - "type": "inverseRef", - "params": { - "attribute": "databases" - } - } - ] + "isIndexable": true }, { "name": "contact_info", @@ -127,19 +98,6 @@ "cardinality": "SINGLE", "isUnique": false, "isIndexable": false - }, - { - "name": "tables", - "typeName": "array<rdbms_table>", - "isOptional": true, - "cardinality": "SET", - "isUnique": false, - "isIndexable": false, - "constraints": [ - { - "type": "ownedRef" - } - ] } ] }, @@ -151,22 +109,6 @@ "typeVersion": "1.1", "attributeDefs": [ { - "name": "db", - "typeName": "rdbms_db", - "isOptional": true, - "cardinality": "SINGLE", - "isUnique": false, - "isIndexable": false, - "constraints": [ - { - "type": "inverseRef", - "params": { - "attribute": "tables" - } - } - ] - }, - { "name": "name_path", "typeName": "string", "isOptional": true, @@ -207,45 +149,6 @@ "valuesMaxCount": 1, "isUnique": false, "isIndexable": false - }, - { - "name": "columns", - "typeName": "array<rdbms_column>", - "isOptional": true, - "cardinality": "SET", - "isUnique": false, - "isIndexable": false, - "constraints": [ - { - "type": "ownedRef" - } - ] - }, - { - "name": "indexes", - "typeName": "array<rdbms_index>", - "isOptional": true, - "cardinality": "SET", - "isUnique": false, - "isIndexable": false, - "constraints": [ - { - "type": "ownedRef" - } - ] - }, - { - "name": "foreign_keys", - "typeName": "array<rdbms_foreign_key>", - "isOptional": true, - "cardinality": "SET", - "isUnique": false, - "isIndexable": false, - "constraints": [ - { - "type": "ownedRef" - } - ] } ] }, @@ -259,7 +162,7 @@ { "name": "data_type", "typeName": "string", - "isOptional": false, + "isOptional": true, "cardinality": "SINGLE", "isUnique": false, "isIndexable": true @@ -289,22 +192,6 @@ "isIndexable": false }, { - "name": "table", - "typeName": "rdbms_table", - "isOptional": true, - "cardinality": "SINGLE", - "isUnique": false, - "isIndexable": false, - "constraints": [ - { - "type": "inverseRef", - "params": { - "attribute": "columns" - } - } - ] - }, - { "name": "isNullable", "typeName": "boolean", "isOptional": true, @@ -330,22 +217,6 @@ "typeVersion": "1.1", "attributeDefs": [ { - "name": "table", - "typeName": "rdbms_table", - "isOptional": true, - "cardinality": "SINGLE", - "isUnique": false, - "isIndexable": false, - "constraints": [ - { - "type": "inverseRef", - "params": { - "attribute": "indexes" - } - } - ] - }, - { "name": "index_type", "typeName": "string", "isOptional": true, @@ -387,14 +258,6 @@ "typeVersion": "1.1", "attributeDefs": [ { - "name": "table", - "typeName": "rdbms_table", - "isOptional": true, - "cardinality": "SINGLE", - "isUnique": false, - "isIndexable": false - }, - { "name": "key_columns", "typeName": "array<rdbms_column>", "isOptional": true, @@ -426,20 +289,20 @@ "name": "rdbms_instance_databases", "serviceType": "rdbms", "typeVersion": "1.0", - "relationshipCategory": "AGGREGATION", + "relationshipCategory": "COMPOSITION", "endDef1": { "type": "rdbms_instance", "name": "databases", "isContainer": true, "cardinality": "SET", - "isLegacyAttribute": true + "isLegacyAttribute": false }, "endDef2": { "type": "rdbms_db", "name": "instance", "isContainer": false, "cardinality": "SINGLE", - "isLegacyAttribute": true + "isLegacyAttribute": false }, "propagateTags": "NONE" }, @@ -447,20 +310,20 @@ "name": "rdbms_db_tables", "serviceType": "rdbms", "typeVersion": "1.0", - "relationshipCategory": "AGGREGATION", + "relationshipCategory": "COMPOSITION", "endDef1": { "type": "rdbms_db", "name": "tables", "isContainer": true, "cardinality": "SET", - "isLegacyAttribute": true + "isLegacyAttribute": false }, "endDef2": { "type": "rdbms_table", "name": "db", "isContainer": false, "cardinality": "SINGLE", - "isLegacyAttribute": true + "isLegacyAttribute": false }, "propagateTags": "NONE" }, @@ -468,20 +331,20 @@ "name": "rdbms_table_columns", "serviceType": "rdbms", "typeVersion": "1.0", - "relationshipCategory": "AGGREGATION", + "relationshipCategory": "COMPOSITION", "endDef1": { "type": "rdbms_table", "name": "columns", "isContainer": true, "cardinality": "SET", - "isLegacyAttribute": true + "isLegacyAttribute": false }, "endDef2": { "type": "rdbms_column", "name": "table", "isContainer": false, "cardinality": "SINGLE", - "isLegacyAttribute": true + "isLegacyAttribute": false }, "propagateTags": "NONE" }, @@ -489,20 +352,20 @@ "name": "rdbms_table_indexes", "serviceType": "rdbms", "typeVersion": "1.0", - "relationshipCategory": "AGGREGATION", + "relationshipCategory": "COMPOSITION", "endDef1": { "type": "rdbms_table", "name": "indexes", "isContainer": true, "cardinality": "SET", - "isLegacyAttribute": true + "isLegacyAttribute": false }, "endDef2": { "type": "rdbms_index", "name": "table", "isContainer": false, "cardinality": "SINGLE", - "isLegacyAttribute": true + "isLegacyAttribute": false }, "propagateTags": "NONE" }, @@ -531,20 +394,20 @@ "name": "rdbms_table_foreign_key", "serviceType": "rdbms", "typeVersion": "1.0", - "relationshipCategory": "AGGREGATION", + "relationshipCategory": "COMPOSITION", "endDef1": { "type": "rdbms_table", "name": "foreign_keys", "isContainer": true, "cardinality": "SET", - "isLegacyAttribute": true + "isLegacyAttribute": false }, "endDef2": { "type": "rdbms_foreign_key", "name": "table", "isContainer": false, "cardinality": "SINGLE", - "isLegacyAttribute": true + "isLegacyAttribute": false }, "propagateTags": "NONE" }, diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 6d03ba4..e9a0151 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -123,6 +123,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size"; + public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; @@ -142,6 +143,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final List<Pattern> hiveTablesToIgnore = new ArrayList<>(); private final List<Pattern> hiveTablesToPrune = new ArrayList<>(); private final Map<String, PreprocessAction> hiveTablesCache; + private final boolean rdbmsTypesRemoveOwnedRefAttrs; private final boolean preprocessEnabled; private NotificationInterface notificationInterface; @@ -212,7 +214,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl hiveTablesCache = Collections.emptyMap(); } - preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633; + rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true); + preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || rdbmsTypesRemoveOwnedRefAttrs; LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); @@ -799,16 +802,35 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl skipHiveColumnLineage(context); } + if (rdbmsTypesRemoveOwnedRefAttrs) { + rdbmsTypeRemoveOwnedRefAttrs(context); + } + context.moveRegisteredReferredEntities(); } + private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) { + List<AtlasEntity> entities = context.getEntities(); + + if (entities != null) { + for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { + AtlasEntity entity = iter.next(); + EntityPreprocessor preprocessor = EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName()); + + if (preprocessor != null) { + preprocessor.preprocess(entity, context); + } + } + } + } + private void ignoreOrPruneHiveTables(PreprocessorContext context) { List<AtlasEntity> entities = context.getEntities(); if (entities != null) { for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) { AtlasEntity entity = iter.next(); - EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName()); + EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName()); if (preprocessor != null) { preprocessor.preprocess(entity, context); @@ -824,7 +846,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (referredEntities != null) { for (Iterator<Map.Entry<String, AtlasEntity>> iter = referredEntities.entrySet().iterator(); iter.hasNext(); ) { AtlasEntity entity = iter.next().getValue(); - EntityPreprocessor preprocessor = EntityPreprocessor.getPreprocessor(entity.getTypeName()); + EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName()); if (preprocessor != null) { preprocessor.preprocess(entity, context); diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java index bdea14a..7eba27a 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java @@ -32,25 +32,38 @@ public abstract class EntityPreprocessor { public static final String TYPE_HIVE_PROCESS = "hive_process"; public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc"; public static final String TYPE_HIVE_TABLE = "hive_table"; + public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance"; + public static final String TYPE_RDBMS_DB = "rdbms_db"; + public static final String TYPE_RDBMS_TABLE = "rdbms_table"; + public static final String TYPE_RDBMS_COLUMN = "rdbms_column"; + public static final String TYPE_RDBMS_INDEX = "rdbms_index"; + public static final String TYPE_RDBMS_FOREIGN_KEY = "rdbms_foreign_key"; public static final String ATTRIBUTE_COLUMNS = "columns"; public static final String ATTRIBUTE_INPUTS = "inputs"; public static final String ATTRIBUTE_OUTPUTS = "outputs"; public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys"; public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + public static final String ATTRIBUTE_NAME = "name"; public static final String ATTRIBUTE_SD = "sd"; + public static final String ATTRIBUTE_DB = "db"; + public static final String ATTRIBUTE_DATABASES = "databases"; + public static final String ATTRIBUTE_TABLES = "tables"; + public static final String ATTRIBUTE_INDEXES = "indexes"; + public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys"; public static final char QNAME_SEP_CLUSTER_NAME = '@'; public static final char QNAME_SEP_ENTITY_NAME = '.'; public static final String QNAME_SD_SUFFIX = "_storage"; - private static final Map<String, EntityPreprocessor> PREPROCESSOR_MAP = new HashMap<>(); + private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP = new HashMap<>(); + private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>(); private final String typeName; static { - EntityPreprocessor[] preprocessors = new EntityPreprocessor[] { + EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] { new HivePreprocessor.HiveTablePreprocessor(), new HivePreprocessor.HiveColumnPreprocessor(), new HivePreprocessor.HiveProcessPreprocessor(), @@ -58,8 +71,18 @@ public abstract class EntityPreprocessor { new HivePreprocessor.HiveStorageDescPreprocessor() }; - for (EntityPreprocessor preprocessor : preprocessors) { - PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); + EntityPreprocessor[] rdbmsPreprocessors = new EntityPreprocessor[] { + new RdbmsPreprocessor.RdbmsInstancePreprocessor(), + new RdbmsPreprocessor.RdbmsDbPreprocessor(), + new RdbmsPreprocessor.RdbmsTablePreprocessor() + }; + + for (EntityPreprocessor preprocessor : hivePreprocessors) { + HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); + } + + for (EntityPreprocessor preprocessor : rdbmsPreprocessors) { + RDBMS_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); } } @@ -74,8 +97,12 @@ public abstract class EntityPreprocessor { public abstract void preprocess(AtlasEntity entity, PreprocessorContext context); - public static EntityPreprocessor getPreprocessor(String typeName) { - return typeName != null ? PREPROCESSOR_MAP.get(typeName) : null; + public static EntityPreprocessor getHivePreprocessor(String typeName) { + return typeName != null ? HIVE_PREPROCESSOR_MAP.get(typeName) : null; + } + + public static EntityPreprocessor getRdbmsPreprocessor(String typeName) { + return typeName != null ? RDBMS_PREPROCESSOR_MAP.get(typeName) : null; } public static String getQualifiedName(AtlasEntity entity) { diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java new file mode 100644 index 0000000..3933cc6 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.commons.collections.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + + +public class RdbmsPreprocessor { + private static final Logger LOG = LoggerFactory.getLogger(RdbmsPreprocessor.class); + + static class RdbmsInstancePreprocessor extends RdbmsTypePreprocessor { + public RdbmsInstancePreprocessor() { + super(TYPE_RDBMS_INSTANCE); + } + } + + static class RdbmsDbPreprocessor extends RdbmsTypePreprocessor { + public RdbmsDbPreprocessor() { + super(TYPE_RDBMS_DB); + } + } + + static class RdbmsTablePreprocessor extends RdbmsTypePreprocessor { + public RdbmsTablePreprocessor() { + super(TYPE_RDBMS_TABLE); + } + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + super.preprocess(entity, context); + + // try auto-fix when 'db' attribute is not present in relationshipAttribute & attributes + Object db = entity.getRelationshipAttribute(ATTRIBUTE_DB); + + if (db == null) { + db = entity.getAttribute(ATTRIBUTE_DB); + } + + if (db == null) { + String dbQualifiedName = getDbQualifiedName(entity); + + if (dbQualifiedName != null) { + AtlasObjectId dbId = new AtlasObjectId(TYPE_RDBMS_DB, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName)); + + LOG.info("missing attribute {}.{} is set to {}", TYPE_RDBMS_TABLE, ATTRIBUTE_DB, dbId); + + entity.setRelationshipAttribute(ATTRIBUTE_DB, dbId); + } + } + } + + private String getDbQualifiedName(AtlasEntity tableEntity) { + String ret = null; + Object tblQualifiedName = tableEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); // dbName.tblName@clusterName + Object tblName = tableEntity.getAttribute(ATTRIBUTE_NAME); // tblName + + if (tblQualifiedName != null && tblName != null) { + ret = tblQualifiedName.toString().replace("." + tblName.toString() + "@", "@"); // dbName@clusterName + } + + return ret; + } + + } + + static class RdbmsTypePreprocessor extends EntityPreprocessor { + private static final Set<String> entityTypesToMove = new HashSet<>(); + + static { + entityTypesToMove.add(TYPE_RDBMS_DB); + entityTypesToMove.add(TYPE_RDBMS_TABLE); + entityTypesToMove.add(TYPE_RDBMS_COLUMN); + entityTypesToMove.add(TYPE_RDBMS_INDEX); + entityTypesToMove.add(TYPE_RDBMS_FOREIGN_KEY); + } + + protected RdbmsTypePreprocessor(String typeName) { + super(typeName); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + clearRefAttributes(entity, context); + + Map<String, AtlasEntity> referredEntities = context.getReferredEntities(); + + if (MapUtils.isNotEmpty(referredEntities)) { + for (AtlasEntity referredEntity : referredEntities.values()) { + if (entityTypesToMove.contains(referredEntity.getTypeName())) { + clearRefAttributes(referredEntity, context); + + context.addToReferredEntitiesToMove(referredEntity.getGuid()); + } + } + } + } + + private void clearRefAttributes(AtlasEntity entity, PreprocessorContext context) { + switch (entity.getTypeName()) { + case TYPE_RDBMS_INSTANCE: + entity.removeAttribute(ATTRIBUTE_DATABASES); + break; + + case TYPE_RDBMS_DB: + entity.removeAttribute(ATTRIBUTE_TABLES); + break; + + case TYPE_RDBMS_TABLE: + entity.removeAttribute(ATTRIBUTE_COLUMNS); + entity.removeAttribute(ATTRIBUTE_INDEXES); + entity.removeAttribute(ATTRIBUTE_FOREIGN_KEYS); + break; + } + } + } +}