This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 23eacba ATLAS-3333: updated notification pre-process with an option to ignore dummy Hive database/table 23eacba is described below commit 23eacbafcea2e58378271fa6dc7b56be08b7cac7 Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Fri Jul 12 00:56:49 2019 -0700 ATLAS-3333: updated notification pre-process with an option to ignore dummy Hive database/table --- .../notification/NotificationHookConsumer.java | 79 +++++++++++++- .../preprocessor/EntityPreprocessor.java | 8 ++ .../preprocessor/HivePreprocessor.java | 34 ++++-- .../preprocessor/PreprocessorContext.java | 121 +++++++++++++++++++-- 4 files changed, 219 insertions(+), 23 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 1c8d72b..f7df6b3 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -71,7 +71,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; -import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.DependsOn; @@ -116,6 +115,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private static final String ATTRIBUTE_INPUTS = "inputs"; private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + // from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer + public static final String DUMMY_DATABASE = "_dummy_database"; + public static final String DUMMY_TABLE = "_dummy_table"; + public static final String VALUES_TMP_TABLE_NAME_PREFIX = "Values__Tmp__Table__"; + private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; @@ -133,6 +137,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size"; + public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED = "atlas.notification.consumer.preprocess.hive_db.ignore.dummy.enabled"; + public static final String CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_db.ignore.dummy.names"; + public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.enabled"; + public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names"; + public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled"; + public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes"; public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs"; public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs"; @@ -154,6 +164,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final boolean consumerDisabled; private final List<Pattern> hiveTablesToIgnore = new ArrayList<>(); private final List<Pattern> hiveTablesToPrune = new ArrayList<>(); + private final List<String> hiveDummyDatabasesToIgnore; + private final List<String> hiveDummyTablesToIgnore; + private final List<String> hiveTablePrefixesToIgnore; private final Map<String, PreprocessAction> hiveTablesCache; private final boolean hiveTypesRemoveOwnedRefAttrs; private final boolean rdbmsTypesRemoveOwnedRefAttrs; @@ -228,9 +241,47 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl hiveTablesCache = Collections.emptyMap(); } + boolean hiveDbIgnoreDummyEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, true); + boolean hiveTableIgnoreDummyEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, true); + boolean hiveTableIgnoreNamePrefixEnabled = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, true); + + LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_ENABLED, hiveDbIgnoreDummyEnabled); + LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_ENABLED, hiveTableIgnoreDummyEnabled); + LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED, hiveTableIgnoreNamePrefixEnabled); + + if (hiveDbIgnoreDummyEnabled) { + String[] dummyDatabaseNames = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES); + + hiveDummyDatabasesToIgnore = trimAndPurge(dummyDatabaseNames, DUMMY_DATABASE); + + LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_DB_IGNORE_DUMMY_NAMES, StringUtils.join(hiveDummyDatabasesToIgnore, ',')); + } else { + hiveDummyDatabasesToIgnore = Collections.emptyList(); + } + + if (hiveTableIgnoreDummyEnabled) { + String[] dummyTableNames = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES); + + hiveDummyTablesToIgnore = trimAndPurge(dummyTableNames, DUMMY_TABLE); + + LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES, StringUtils.join(hiveDummyTablesToIgnore, ',')); + } else { + hiveDummyTablesToIgnore = Collections.emptyList(); + } + + if (hiveTableIgnoreNamePrefixEnabled) { + String[] ignoreNamePrefixes = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES); + + hiveTablePrefixesToIgnore = trimAndPurge(ignoreNamePrefixes, VALUES_TMP_TABLE_NAME_PREFIX); + + LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES, StringUtils.join(hiveTablePrefixesToIgnore, ',')); + } else { + hiveTablePrefixesToIgnore = Collections.emptyList(); + } + hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true); rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true); - preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs; + preprocessEnabled = skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty(); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); @@ -366,6 +417,26 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } + private List<String> trimAndPurge(String[] values, String defaultValue) { + final List<String> ret; + + if (values != null && values.length > 0) { + ret = new ArrayList<>(values.length); + + for (String val : values) { + if (StringUtils.isNotBlank(val)) { + ret.add(val.trim()); + } + } + } else if (StringUtils.isNotBlank(defaultValue)) { + ret = Collections.singletonList(defaultValue.trim()); + } else { + ret = Collections.emptyList(); + } + + return ret; + } + static class AdaptiveWaiter { private final long increment; private final long maxDuration; @@ -843,7 +914,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl PreprocessorContext context = null; if (preprocessEnabled) { - context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs); + context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs); if (context.isHivePreprocessEnabled()) { preprocessHiveTypes(context); @@ -878,7 +949,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } if (entities.size() - count > 0) { - LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size()); + LOG.info("preprocess: moved {} hive_process/hive_column_lineage entities to end of list (listSize={}). topic-offset={}, partition={}", entities.size() - count, entities.size(), kafkaMsg.getOffset(), kafkaMsg.getPartition()); } } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java index 9b620dd..0cddd41 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java @@ -31,6 +31,7 @@ public abstract class EntityPreprocessor { public static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage"; public static final String TYPE_HIVE_PROCESS = "hive_process"; public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc"; + public static final String TYPE_HIVE_DB = "hive_db"; public static final String TYPE_HIVE_TABLE = "hive_table"; public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance"; public static final String TYPE_RDBMS_DB = "rdbms_db"; @@ -67,6 +68,7 @@ public abstract class EntityPreprocessor { static { EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] { + new HivePreprocessor.HiveDbPreprocessor(), new HivePreprocessor.HiveTablePreprocessor(), new HivePreprocessor.HiveColumnPreprocessor(), new HivePreprocessor.HiveProcessPreprocessor(), @@ -114,6 +116,12 @@ public abstract class EntityPreprocessor { return obj != null ? obj.toString() : null; } + public static String getName(AtlasEntity entity) { + Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_NAME) : null; + + return obj != null ? obj.toString() : null; + } + public String getTypeName(Object obj) { Object ret = null; diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java index cc31032..d31495c 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java @@ -34,6 +34,23 @@ public class HivePreprocessor { private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys"; private static final String RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC = "hive_table_storagedesc"; + static class HiveDbPreprocessor extends EntityPreprocessor { + public HiveDbPreprocessor() { + super(TYPE_HIVE_DB); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + if (!context.isIgnoredEntity(entity.getGuid())) { + PreprocessAction action = context.getPreprocessActionForHiveDb(getName(entity)); + + if (action == PreprocessAction.IGNORE) { + context.addToIgnoredEntities(entity); + } + } + } + } + static class HiveTablePreprocessor extends EntityPreprocessor { public HiveTablePreprocessor() { super(TYPE_HIVE_TABLE); @@ -147,20 +164,19 @@ public class HivePreprocessor { if (context.isIgnoredEntity(entity.getGuid())) { context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName } else { - Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS); - Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS); - - int inputsCount = (inputs instanceof Collection) ? ((Collection) inputs).size() : 0; - int outputsCount = (outputs instanceof Collection) ? ((Collection) outputs).size() : 0; + Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS); + Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS); + int inputsCount = getCollectionSize(inputs); + int outputsCount = getCollectionSize(outputs); removeIgnoredObjectIds(inputs, context); removeIgnoredObjectIds(outputs, context); boolean isInputsEmpty = isEmpty(inputs); boolean isOutputsEmpty = isEmpty(outputs); + boolean isAnyRemoved = inputsCount > getCollectionSize(inputs) || outputsCount > getCollectionSize(outputs); - // if inputs/outputs became empty due to removal of ignored entities, ignore the process entity as well - if ((inputsCount > 0 && isInputsEmpty) || (outputsCount > 0 && isOutputsEmpty)) { + if (isAnyRemoved && (isInputsEmpty || isOutputsEmpty)) { context.addToIgnoredEntities(entity); // since the process entity is ignored, entities referenced by inputs/outputs of this process entity @@ -186,6 +202,10 @@ public class HivePreprocessor { } } + private int getCollectionSize(Object obj) { + return (obj instanceof Collection) ? ((Collection) obj).size() : 0; + } + private void removeIgnoredObjectIds(Object obj, PreprocessorContext context) { if (obj == null || !(obj instanceof Collection)) { return; diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java index 2db0574..3c58c9f 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java @@ -41,6 +41,8 @@ import java.util.Set; import java.util.regex.Pattern; import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID; +import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_CLUSTER_NAME; +import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.QNAME_SEP_ENTITY_NAME; public class PreprocessorContext { @@ -54,8 +56,12 @@ public class PreprocessorContext { private final List<Pattern> hiveTablesToIgnore; private final List<Pattern> hiveTablesToPrune; private final Map<String, PreprocessAction> hiveTablesCache; + private final List<String> hiveDummyDatabasesToIgnore; + private final List<String> hiveDummyTablesToIgnore; + private final List<String> hiveTablePrefixesToIgnore; private final boolean hiveTypesRemoveOwnedRefAttrs; private final boolean rdbmsTypesRemoveOwnedRefAttrs; + private final boolean isHivePreProcessEnabled; private final Set<String> ignoredEntities = new HashSet<>(); private final Set<String> prunedEntities = new HashSet<>(); private final Set<String> referredEntitiesToMove = new HashSet<>(); @@ -64,12 +70,15 @@ public class PreprocessorContext { private final Map<String, String> guidAssignments = new HashMap<>(); private List<AtlasEntity> postUpdateEntities = null; - public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) { + public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) { this.kafkaMessage = kafkaMessage; this.typeRegistry = typeRegistry; this.hiveTablesToIgnore = hiveTablesToIgnore; this.hiveTablesToPrune = hiveTablesToPrune; this.hiveTablesCache = hiveTablesCache; + this.hiveDummyDatabasesToIgnore = hiveDummyDatabasesToIgnore; + this.hiveDummyTablesToIgnore = hiveDummyTablesToIgnore; + this.hiveTablePrefixesToIgnore = hiveTablePrefixesToIgnore; this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs; this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs; @@ -88,6 +97,8 @@ public class PreprocessorContext { entitiesWithExtInfo = null; break; } + + this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty(); } public AtlasKafkaMessage<HookNotification> getKafkaMessage() { @@ -107,7 +118,7 @@ public class PreprocessorContext { public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; } public boolean isHivePreprocessEnabled() { - return !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs; + return isHivePreProcessEnabled; } public List<AtlasEntity> getEntities() { @@ -142,22 +153,78 @@ public class PreprocessorContext { public List<AtlasEntity> getPostUpdateEntities() { return postUpdateEntities; } + public PreprocessAction getPreprocessActionForHiveDb(String dbName) { + PreprocessAction ret = PreprocessAction.NONE; + + if (dbName != null) { + for (String dummyDbName : hiveDummyDatabasesToIgnore) { + if (StringUtils.equalsIgnoreCase(dbName, dummyDbName)) { + ret = PreprocessAction.IGNORE; + + break; + } + } + } + + return ret; + } + public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) { PreprocessAction ret = PreprocessAction.NONE; - if (qualifiedName != null && (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune))) { - ret = hiveTablesCache.get(qualifiedName); + if (qualifiedName != null) { + if (CollectionUtils.isNotEmpty(hiveTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablesToPrune)) { + ret = hiveTablesCache.get(qualifiedName); - if (ret == null) { - if (isMatch(qualifiedName, hiveTablesToIgnore)) { - ret = PreprocessAction.IGNORE; - } else if (isMatch(qualifiedName, hiveTablesToPrune)) { - ret = PreprocessAction.PRUNE; - } else { - ret = PreprocessAction.NONE; + if (ret == null) { + if (isMatch(qualifiedName, hiveTablesToIgnore)) { + ret = PreprocessAction.IGNORE; + } else if (isMatch(qualifiedName, hiveTablesToPrune)) { + ret = PreprocessAction.PRUNE; + } else { + ret = PreprocessAction.NONE; + } + + hiveTablesCache.put(qualifiedName, ret); } + } + + if (ret != PreprocessAction.IGNORE && (CollectionUtils.isNotEmpty(hiveDummyTablesToIgnore) || CollectionUtils.isNotEmpty(hiveTablePrefixesToIgnore))) { + String tblName = getHiveTableNameFromQualifiedName(qualifiedName); - hiveTablesCache.put(qualifiedName, ret); + if (tblName != null) { + for (String dummyTblName : hiveDummyTablesToIgnore) { + if (StringUtils.equalsIgnoreCase(tblName, dummyTblName)) { + ret = PreprocessAction.IGNORE; + + break; + } + } + + if (ret != PreprocessAction.IGNORE) { + for (String tableNamePrefix : hiveTablePrefixesToIgnore) { + if (StringUtils.startsWithIgnoreCase(tblName, tableNamePrefix)) { + ret = PreprocessAction.IGNORE; + + break; + } + } + } + } + + if (ret != PreprocessAction.IGNORE && CollectionUtils.isNotEmpty(hiveDummyDatabasesToIgnore)) { + String dbName = getHiveDbNameFromQualifiedName(qualifiedName); + + if (dbName != null) { + for (String dummyDbName : hiveDummyDatabasesToIgnore) { + if (StringUtils.equalsIgnoreCase(dbName, dummyDbName)) { + ret = PreprocessAction.IGNORE; + + break; + } + } + } + } } } @@ -321,6 +388,36 @@ public class PreprocessorContext { } } + public String getHiveTableNameFromQualifiedName(String qualifiedName) { + String ret = null; + int idxStart = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME) + 1; + + if (idxStart != 0 && qualifiedName.length() > idxStart) { + int idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME, idxStart); + + if (idxEnd != -1) { + ret = qualifiedName.substring(idxStart, idxEnd); + } + } + + return ret; + } + + public String getHiveDbNameFromQualifiedName(String qualifiedName) { + String ret = null; + int idxEnd = qualifiedName.indexOf(QNAME_SEP_ENTITY_NAME); // db.table@cluster, db.table.column@cluster + + if (idxEnd == -1) { + idxEnd = qualifiedName.indexOf(QNAME_SEP_CLUSTER_NAME); // db@cluster + } + + if (idxEnd != -1) { + ret = qualifiedName.substring(0, idxEnd); + } + + return ret; + } + public String getTypeName(Object obj) { Object ret = null;