http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java new file mode 100755 index 0000000..64fca66 --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.bridge; + +import org.apache.atlas.MetadataServiceClient; +import org.apache.atlas.MetadataServiceException; +import org.apache.atlas.hive.model.HiveDataModelGenerator; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.Struct; +import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * A Bridge Utility that imports metadata from the Hive Meta Store + * and registers then in DGI. + */ +public class HiveMetaStoreBridge { + private static final String DEFAULT_DGI_URL = "http://localhost:21000/"; + public static final String HIVE_CLUSTER_NAME = "hive.cluster.name"; + public static final String DEFAULT_CLUSTER_NAME = "primary"; + private final String clusterName; + + public static final String DGI_URL_PROPERTY = "hive.hook.dgi.url"; + + private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class); + + private final Hive hiveClient; + private final MetadataServiceClient metadataServiceClient; + + /** + * Construct a HiveMetaStoreBridge. + * @param hiveConf hive conf + */ + public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception { + clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + hiveClient = Hive.get(hiveConf); + metadataServiceClient = new MetadataServiceClient(hiveConf.get(DGI_URL_PROPERTY, DEFAULT_DGI_URL)); + } + + public MetadataServiceClient getMetadataServiceClient() { + return metadataServiceClient; + } + + public void importHiveMetadata() throws Exception { + LOG.info("Importing hive metadata"); + importDatabases(); + } + + private void importDatabases() throws Exception { + List<String> databases = hiveClient.getAllDatabases(); + for (String databaseName : databases) { + Referenceable dbReference = registerDatabase(databaseName); + + importTables(databaseName, dbReference); + } + } + + public Referenceable registerDatabase(String databaseName) throws Exception { + Referenceable dbRef = getDatabaseReference(databaseName, clusterName); + if (dbRef == null) { + LOG.info("Importing objects from databaseName : " + databaseName); + Database hiveDB = hiveClient.getDatabase(databaseName); + + dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); + dbRef.set(HiveDataModelGenerator.NAME, hiveDB.getName().toLowerCase()); + dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName); + dbRef.set("description", hiveDB.getDescription()); + dbRef.set("locationUri", hiveDB.getLocationUri()); + dbRef.set("parameters", hiveDB.getParameters()); + dbRef.set("ownerName", hiveDB.getOwnerName()); + if (hiveDB.getOwnerType() != null) { + dbRef.set("ownerType", hiveDB.getOwnerType().getValue()); + } + + dbRef = createInstance(dbRef); + } else { + LOG.info("Database {} is already registered with id {}", databaseName, dbRef.getId().id); + } + return dbRef; + } + + public Referenceable createInstance(Referenceable referenceable) throws Exception { + String typeName = referenceable.getTypeName(); + LOG.debug("creating instance of type " + typeName); + + String entityJSON = InstanceSerialization.toJson(referenceable, true); + LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON); + JSONObject jsonObject = metadataServiceClient.createEntity(entityJSON); + String guid = jsonObject.getString(MetadataServiceClient.GUID); + LOG.debug("created instance for type " + typeName + ", guid: " + guid); + + return new Referenceable(guid, referenceable.getTypeName(), null); + } + + private void importTables(String databaseName, Referenceable databaseReferenceable) throws Exception { + List<String> hiveTables = hiveClient.getAllTables(databaseName); + + for (String tableName : hiveTables) { + Referenceable tableReferenceable = registerTable(databaseReferenceable, databaseName, tableName); + + // Import Partitions + Referenceable sdReferenceable = getSDForTable(databaseName, tableName); + registerPartitions(databaseName, tableName, tableReferenceable, sdReferenceable); + + // Import Indexes + importIndexes(databaseName, tableName, databaseReferenceable, tableReferenceable); + } + } + + /** + * Gets reference for the database + * + * + * @param databaseName database Name + * @param clusterName cluster name + * @return Reference for database if exists, else null + * @throws Exception + */ + private Referenceable getDatabaseReference(String databaseName, String clusterName) throws Exception { + LOG.debug("Getting reference for database {}", databaseName); + String typeName = HiveDataTypes.HIVE_DB.getName(); + + String dslQuery = String.format("%s where %s = '%s' and %s = '%s'", typeName, + HiveDataModelGenerator.NAME, databaseName.toLowerCase(), HiveDataModelGenerator.CLUSTER_NAME, + clusterName); + return getEntityReferenceFromDSL(typeName, dslQuery); + } + + public Referenceable getProcessReference(String queryStr) throws Exception { + LOG.debug("Getting reference for process with query {}", queryStr); + String typeName = HiveDataTypes.HIVE_PROCESS.getName(); + + //todo enable DSL +// String dslQuery = String.format("%s where queryText = \"%s\"", typeName, queryStr); +// return getEntityReferenceFromDSL(typeName, dslQuery); + + String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", + typeName, typeName, StringEscapeUtils.escapeJava(queryStr)); + return getEntityReferenceFromGremlin(typeName, gremlinQuery); + } + + private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception { + MetadataServiceClient dgiClient = getMetadataServiceClient(); + JSONArray results = dgiClient.searchByDSL(dslQuery); + if (results.length() == 0) { + return null; + } else { + String guid; + JSONObject row = results.getJSONObject(0); + if (row.has("$id$")) { + guid = row.getJSONObject("$id$").getString("id"); + } else { + guid = row.getJSONObject("_col_0").getString("id"); + } + return new Referenceable(guid, typeName, null); + } + } + + public static String getTableName(String clusterName, String dbName, String tableName) { + return String.format("%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), clusterName); + } + + /** + * Gets reference for the table + * + * @param dbName database name + * @param tableName table name + * @return table reference if exists, else null + * @throws Exception + */ + private Referenceable getTableReference(String dbName, String tableName) throws Exception { + LOG.debug("Getting reference for table {}.{}", dbName, tableName); + + String typeName = HiveDataTypes.HIVE_TABLE.getName(); + String entityName = getTableName(clusterName, dbName, tableName); + String dslQuery = String.format("%s as t where name = '%s'", typeName, entityName); + return getEntityReferenceFromDSL(typeName, dslQuery); + } + + private Referenceable getEntityReferenceFromGremlin(String typeName, String gremlinQuery) throws MetadataServiceException, + JSONException { + MetadataServiceClient client = getMetadataServiceClient(); + JSONObject response = client.searchByGremlin(gremlinQuery); + JSONArray results = response.getJSONArray(MetadataServiceClient.RESULTS); + if (results.length() == 0) { + return null; + } + String guid = results.getJSONObject(0).getString("__guid"); + return new Referenceable(guid, typeName, null); + } + + private Referenceable getPartitionReference(String dbName, String tableName, List<String> values) throws Exception { + String valuesStr = "['" + StringUtils.join(values, "', '") + "']"; + LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr); + String typeName = HiveDataTypes.HIVE_PARTITION.getName(); + + //todo replace gremlin with DSL + // String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', " + // + "dbName where name = '%s' and clusterName = '%s' select p", typeName, valuesStr, tableName, + // dbName, clusterName); + + String datasetType = MetadataServiceClient.DATA_SET_SUPER_TYPE; + String tableEntityName = getTableName(clusterName, dbName, tableName); + + String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')." + + "out('__%s.table').has('%s.name', '%s').back('p').toList()", typeName, typeName, valuesStr, + typeName, datasetType, tableEntityName); + + return getEntityReferenceFromGremlin(typeName, gremlinQuery); + } + + private Referenceable getSDForTable(String dbName, String tableName) throws Exception { + Referenceable tableRef = getTableReference(dbName, tableName); + if (tableRef == null) { + throw new IllegalArgumentException("Table " + dbName + "." + tableName + " doesn't exist"); + } + + MetadataServiceClient dgiClient = getMetadataServiceClient(); + Referenceable tableInstance = dgiClient.getEntity(tableRef.getId().id); + Id sdId = (Id) tableInstance.get("sd"); + return new Referenceable(sdId.id, sdId.getTypeName(), null); + } + + public Referenceable registerTable(String dbName, String tableName) throws Exception { + Referenceable dbReferenceable = registerDatabase(dbName); + return registerTable(dbReferenceable, dbName, tableName); + } + + public Referenceable registerTable(Referenceable dbReference, String dbName, String tableName) throws Exception { + LOG.info("Attempting to register table [" + tableName + "]"); + Referenceable tableRef = getTableReference(dbName, tableName); + if (tableRef == null) { + LOG.info("Importing objects from " + dbName + "." + tableName); + + Table hiveTable = hiveClient.getTable(dbName, tableName); + + tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); + tableRef.set(HiveDataModelGenerator.NAME, + getTableName(clusterName, hiveTable.getDbName(), hiveTable.getTableName())); + tableRef.set(HiveDataModelGenerator.TABLE_NAME, hiveTable.getTableName().toLowerCase()); + tableRef.set("owner", hiveTable.getOwner()); + + tableRef.set("createTime", hiveTable.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)); + tableRef.set("lastAccessTime", hiveTable.getLastAccessTime()); + tableRef.set("retention", hiveTable.getRetention()); + + tableRef.set(HiveDataModelGenerator.COMMENT, hiveTable.getParameters().get(HiveDataModelGenerator.COMMENT)); + + // add reference to the database + tableRef.set(HiveDataModelGenerator.DB, dbReference); + + List<Referenceable> colList = getColumns(hiveTable.getCols()); + tableRef.set("columns", colList); + + // add reference to the StorageDescriptor + StorageDescriptor storageDesc = hiveTable.getSd(); + Referenceable sdReferenceable = fillStorageDescStruct(storageDesc, colList); + tableRef.set("sd", sdReferenceable); + + // add reference to the Partition Keys + List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys()); + tableRef.set("partitionKeys", partKeys); + + tableRef.set("parameters", hiveTable.getParameters()); + + if (hiveTable.getViewOriginalText() != null) { + tableRef.set("viewOriginalText", hiveTable.getViewOriginalText()); + } + + if (hiveTable.getViewExpandedText() != null) { + tableRef.set("viewExpandedText", hiveTable.getViewExpandedText()); + } + + tableRef.set("tableType", hiveTable.getTableType().name()); + tableRef.set("temporary", hiveTable.isTemporary()); + + + tableRef = createInstance(tableRef); + } else { + LOG.info("Table {}.{} is already registered with id {}", dbName, tableName, tableRef.getId().id); + } + return tableRef; + } + + private void registerPartitions(String db, String tableName, Referenceable tableReferenceable, + Referenceable sdReferenceable) throws Exception { + Set<Partition> tableParts = hiveClient.getAllPartitionsOf(new Table(Table.getEmptyTable(db, tableName))); + + if (tableParts.size() > 0) { + for (Partition hivePart : tableParts) { + registerPartition(hivePart, tableReferenceable, sdReferenceable); + } + } + } + + public Referenceable registerPartition(Partition partition) throws Exception { + String dbName = partition.getTable().getDbName(); + String tableName = partition.getTable().getTableName(); + Referenceable tableRef = registerTable(dbName, tableName); + Referenceable sdRef = getSDForTable(dbName, tableName); + return registerPartition(partition, tableRef, sdRef); + } + + private Referenceable registerPartition(Partition hivePart, Referenceable tableReferenceable, + Referenceable sdReferenceable) throws Exception { + LOG.info("Registering partition for {} with values {}", tableReferenceable, + StringUtils.join(hivePart.getValues(), ",")); + String dbName = hivePart.getTable().getDbName(); + String tableName = hivePart.getTable().getTableName(); + + Referenceable partRef = getPartitionReference(dbName, tableName, hivePart.getValues()); + if (partRef == null) { + partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName()); + partRef.set("values", hivePart.getValues()); + + partRef.set(HiveDataModelGenerator.TABLE, tableReferenceable); + + //todo fix + partRef.set("createTime", hivePart.getLastAccessTime()); + partRef.set("lastAccessTime", hivePart.getLastAccessTime()); + + // sdStruct = fillStorageDescStruct(hivePart.getSd()); + // Instead of creating copies of the sdstruct for partitions we are reusing existing + // ones will fix to identify partitions with differing schema. + partRef.set("sd", sdReferenceable); + + partRef.set("parameters", hivePart.getParameters()); + partRef = createInstance(partRef); + } else { + LOG.info("Partition {}.{} with values {} is already registered with id {}", dbName, tableName, + StringUtils.join(hivePart.getValues(), ","), partRef.getId().id); + } + return partRef; + } + + private void importIndexes(String db, String table, + Referenceable dbReferenceable, + Referenceable tableReferenceable) throws Exception { + List<Index> indexes = hiveClient.getIndexes(db, table, Short.MAX_VALUE); + if (indexes.size() > 0) { + for (Index index : indexes) { + importIndex(index, dbReferenceable, tableReferenceable); + } + } + } + + //todo should be idempotent + private void importIndex(Index index, + Referenceable dbReferenceable, + Referenceable tableReferenceable) throws Exception { + LOG.info("Importing index {} for {}.{}", index.getIndexName(), dbReferenceable, tableReferenceable); + Referenceable indexRef = new Referenceable(HiveDataTypes.HIVE_INDEX.getName()); + + indexRef.set(HiveDataModelGenerator.NAME, index.getIndexName()); + indexRef.set("indexHandlerClass", index.getIndexHandlerClass()); + + indexRef.set(HiveDataModelGenerator.DB, dbReferenceable); + + indexRef.set("createTime", index.getCreateTime()); + indexRef.set("lastAccessTime", index.getLastAccessTime()); + indexRef.set("origTable", index.getOrigTableName()); + indexRef.set("indexTable", index.getIndexTableName()); + + Referenceable sdReferenceable = fillStorageDescStruct(index.getSd(), null); + indexRef.set("sd", sdReferenceable); + + indexRef.set("parameters", index.getParameters()); + + tableReferenceable.set("deferredRebuild", index.isDeferredRebuild()); + + createInstance(indexRef); + } + + private Referenceable fillStorageDescStruct(StorageDescriptor storageDesc, List<Referenceable> colList) throws Exception { + LOG.debug("Filling storage descriptor information for " + storageDesc); + + Referenceable sdReferenceable = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName()); + + SerDeInfo serdeInfo = storageDesc.getSerdeInfo(); + LOG.debug("serdeInfo = " + serdeInfo); + // SkewedInfo skewedInfo = storageDesc.getSkewedInfo(); + + String serdeInfoName = HiveDataTypes.HIVE_SERDE.getName(); + Struct serdeInfoStruct = new Struct(serdeInfoName); + + serdeInfoStruct.set(HiveDataModelGenerator.NAME, serdeInfo.getName()); + serdeInfoStruct.set("serializationLib", serdeInfo.getSerializationLib()); + serdeInfoStruct.set("parameters", serdeInfo.getParameters()); + + sdReferenceable.set("serdeInfo", serdeInfoStruct); + sdReferenceable.set(HiveDataModelGenerator.STORAGE_NUM_BUCKETS, storageDesc.getNumBuckets()); + sdReferenceable.set(HiveDataModelGenerator.STORAGE_IS_STORED_AS_SUB_DIRS, storageDesc.isStoredAsSubDirectories()); + + //Use the passed column list if not null, ex: use same references for table and SD + List<FieldSchema> columns = storageDesc.getCols(); + if (columns != null && !columns.isEmpty()) { + if (colList != null) { + sdReferenceable.set("cols", colList); + } else { + sdReferenceable.set("cols", getColumns(columns)); + } + } + + List<Struct> sortColsStruct = new ArrayList<>(); + for (Order sortcol : storageDesc.getSortCols()) { + String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName(); + Struct colStruct = new Struct(hiveOrderName); + colStruct.set("col", sortcol.getCol()); + colStruct.set("order", sortcol.getOrder()); + + sortColsStruct.add(colStruct); + } + if (sortColsStruct.size() > 0) { + sdReferenceable.set("sortCols", sortColsStruct); + } + + sdReferenceable.set("location", storageDesc.getLocation()); + sdReferenceable.set("inputFormat", storageDesc.getInputFormat()); + sdReferenceable.set("outputFormat", storageDesc.getOutputFormat()); + sdReferenceable.set("compressed", storageDesc.isCompressed()); + + if (storageDesc.getBucketCols().size() > 0) { + sdReferenceable.set("bucketCols", storageDesc.getBucketCols()); + } + + sdReferenceable.set("parameters", storageDesc.getParameters()); + sdReferenceable.set("storedAsSubDirectories", storageDesc.isStoredAsSubDirectories()); + + return createInstance(sdReferenceable); + } + + private List<Referenceable> getColumns(List<FieldSchema> schemaList) throws Exception + { + List<Referenceable> colList = new ArrayList<>(); + for (FieldSchema fs : schemaList) { + LOG.debug("Processing field " + fs); + Referenceable colReferenceable = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName()); + colReferenceable.set(HiveDataModelGenerator.NAME, fs.getName()); + colReferenceable.set("type", fs.getType()); + colReferenceable.set(HiveDataModelGenerator.COMMENT, fs.getComment()); + + colList.add(createInstance(colReferenceable)); + } + return colList; + } + + public synchronized void registerHiveDataModel() throws Exception { + HiveDataModelGenerator dataModelGenerator = new HiveDataModelGenerator(); + MetadataServiceClient dgiClient = getMetadataServiceClient(); + + //Register hive data model if its not already registered + if (dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName()) == null ) { + LOG.info("Registering Hive data model"); + dgiClient.createType(dataModelGenerator.getModelAsJson()); + } else { + LOG.info("Hive data model is already registered!"); + } + } + + public static void main(String[] argv) throws Exception { + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf()); + hiveMetaStoreBridge.registerHiveDataModel(); + hiveMetaStoreBridge.importHiveMetadata(); + } + + public void updateTable(Referenceable tableReferenceable, Table newTable) throws MetadataServiceException { + MetadataServiceClient client = getMetadataServiceClient(); + client.updateEntity(tableReferenceable.getId()._getId(), HiveDataModelGenerator.TABLE_NAME, + newTable.getTableName().toLowerCase()); + client.updateEntity(tableReferenceable.getId()._getId(), HiveDataModelGenerator.NAME, + getTableName(clusterName, newTable.getDbName(), newTable.getTableName())); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java new file mode 100755 index 0000000..a70bfea --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.hook; + + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.ExplainTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * DgiHook sends lineage information to the DgiSever. + */ +public class HiveHook implements ExecuteWithHookContext { + + private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class); + + // wait time determines how long we wait before we exit the jvm on + // shutdown. Pending requests after that will not be sent. + private static final int WAIT_TIME = 3; + private static ExecutorService executor; + + private static final String MIN_THREADS = "hive.hook.dgi.minThreads"; + private static final String MAX_THREADS = "hive.hook.dgi.maxThreads"; + private static final String KEEP_ALIVE_TIME = "hive.hook.dgi.keepAliveTime"; + + private static final int minThreadsDefault = 5; + private static final int maxThreadsDefault = 5; + private static final long keepAliveTimeDefault = 10; + private static boolean typesRegistered = false; + + static { + // anything shared should be initialized here and destroyed in the + // shutdown hook The hook contract is weird in that it creates a + // boatload of hooks. + + // initialize the async facility to process hook calls. We don't + // want to do this inline since it adds plenty of overhead for the + // query. + HiveConf hiveConf = new HiveConf(); + int minThreads = hiveConf.getInt(MIN_THREADS, minThreadsDefault); + int maxThreads = hiveConf.getInt(MAX_THREADS, maxThreadsDefault); + long keepAliveTime = hiveConf.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault); + + executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DGI Logger %d").build()); + + try { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + executor.shutdown(); + executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); + executor = null; + } catch (InterruptedException ie) { + LOG.info("Interrupt received in shutdown."); + } + // shutdown client + } + }); + } catch (IllegalStateException is) { + LOG.info("Attempting to send msg while shutdown in progress."); + } + + LOG.info("Created DGI Hook"); + } + + class HiveEvent { + public HiveConf conf; + + public Set<ReadEntity> inputs; + public Set<WriteEntity> outputs; + + public String user; + public HiveOperation operation; + public QueryPlan queryPlan; + public HookContext.HookType hookType; + public JSONObject jsonPlan; + } + + @Override + public void run(final HookContext hookContext) throws Exception { + if (executor == null) { + LOG.info("No executor running. Bail."); + return; + } + + // clone to avoid concurrent access + final HiveEvent event = new HiveEvent(); + final HiveConf conf = new HiveConf(hookContext.getConf()); + boolean debug = conf.get("hive.hook.dgi.synchronous", "false").equals("true"); + + event.conf = conf; + event.inputs = hookContext.getInputs(); + event.outputs = hookContext.getOutputs(); + + event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName(); + event.operation = HiveOperation.valueOf(hookContext.getOperationName()); + event.queryPlan = hookContext.getQueryPlan(); + event.hookType = hookContext.getHookType(); + + event.jsonPlan = getQueryPlan(event); + + if (debug) { + fireAndForget(event); + } else { + executor.submit(new Runnable() { + @Override + public void run() { + try { + fireAndForget(event); + } catch (Throwable e) { + LOG.info("DGI hook failed", e); + } + } + }); + } + } + + private void fireAndForget(HiveEvent event) throws Exception { + assert event.hookType == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!"; + + LOG.info("Entered DGI hook for hook type {} operation {}", event.hookType, event.operation); + HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(event.conf); + + if (!typesRegistered) { + dgiBridge.registerHiveDataModel(); + typesRegistered = true; + } + + switch (event.operation) { + case CREATEDATABASE: + handleCreateDB(dgiBridge, event); + break; + + case CREATETABLE: + handleCreateTable(dgiBridge, event); + break; + + case CREATETABLE_AS_SELECT: + case CREATEVIEW: + case LOAD: + case EXPORT: + case IMPORT: + case QUERY: + registerProcess(dgiBridge, event); + break; + + case ALTERTABLE_RENAME: + case ALTERVIEW_RENAME: + renameTable(dgiBridge, event); + break; + + case ALTERVIEW_AS: + //update inputs/outputs? + break; + + case ALTERTABLE_ADDCOLS: + case ALTERTABLE_REPLACECOLS: + case ALTERTABLE_RENAMECOL: + break; + + default: + } + } + + private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { + //crappy, no easy of getting new name + assert event.inputs != null && event.inputs.size() == 1; + assert event.outputs != null && event.outputs.size() > 0; + + Table oldTable = event.inputs.iterator().next().getTable(); + Table newTable = null; + for (WriteEntity writeEntity : event.outputs) { + if (writeEntity.getType() == Entity.Type.TABLE) { + Table table = writeEntity.getTable(); + if (table.getDbName().equals(oldTable.getDbName()) && !table.getTableName() + .equals(oldTable.getTableName())) { + newTable = table; + break; + } + } + } + if (newTable == null) { + LOG.warn("Failed to deduct new name for " + event.queryPlan.getQueryStr()); + return; + } + + Referenceable dbReferenceable = dgiBridge.registerDatabase(oldTable.getDbName()); + Referenceable tableReferenceable = + dgiBridge.registerTable(dbReferenceable, oldTable.getDbName(), oldTable.getTableName()); + LOG.info("Updating entity name {}.{} to {}", oldTable.getDbName(), oldTable.getTableName(), + newTable.getTableName()); + dgiBridge.updateTable(tableReferenceable, newTable); + } + + private void handleCreateTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { + for (WriteEntity entity : event.outputs) { + if (entity.getType() == Entity.Type.TABLE) { + + Table table = entity.getTable(); + Referenceable dbReferenceable = dgiBridge.registerDatabase(table.getDbName()); + dgiBridge.registerTable(dbReferenceable, table.getDbName(), table.getTableName()); + } + } + } + + private void handleCreateDB(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { + for (WriteEntity entity : event.outputs) { + if (entity.getType() == Entity.Type.DATABASE) { + dgiBridge.registerDatabase(entity.getDatabase().getName()); + } + } + } + + private String normalize(String str) { + if (StringUtils.isEmpty(str)) { + return null; + } + return str.toLowerCase().trim(); + } + + private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { + Set<ReadEntity> inputs = event.inputs; + Set<WriteEntity> outputs = event.outputs; + + //Even explain CTAS has operation name as CREATETABLE_AS_SELECT + if (inputs.isEmpty() && outputs.isEmpty()) { + LOG.info("Explain statement. Skipping..."); + } + + if (event.queryPlan == null) { + LOG.info("Query plan is missing. Skipping..."); + } + + String queryId = event.queryPlan.getQueryId(); + String queryStr = normalize(event.queryPlan.getQueryStr()); + long queryStartTime = event.queryPlan.getQueryStartTime(); + + LOG.debug("Registering CTAS query: {}", queryStr); + Referenceable processReferenceable = dgiBridge.getProcessReference(queryStr); + if (processReferenceable == null) { + processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); + processReferenceable.set("name", event.operation.getOperationName()); + processReferenceable.set("startTime", queryStartTime); + processReferenceable.set("userName", event.user); + + List<Referenceable> source = new ArrayList<>(); + for (ReadEntity readEntity : inputs) { + if (readEntity.getType() == Entity.Type.TABLE) { + Table table = readEntity.getTable(); + String dbName = table.getDbName(); + source.add(dgiBridge.registerTable(dbName, table.getTableName())); + } + if (readEntity.getType() == Entity.Type.PARTITION) { + dgiBridge.registerPartition(readEntity.getPartition()); + } + } + processReferenceable.set("inputs", source); + + List<Referenceable> target = new ArrayList<>(); + for (WriteEntity writeEntity : outputs) { + if (writeEntity.getType() == Entity.Type.TABLE || writeEntity.getType() == Entity.Type.PARTITION) { + Table table = writeEntity.getTable(); + String dbName = table.getDbName(); + target.add(dgiBridge.registerTable(dbName, table.getTableName())); + } + if (writeEntity.getType() == Entity.Type.PARTITION) { + dgiBridge.registerPartition(writeEntity.getPartition()); + } + } + processReferenceable.set("outputs", target); + processReferenceable.set("queryText", queryStr); + processReferenceable.set("queryId", queryId); + processReferenceable.set("queryPlan", event.jsonPlan.toString()); + processReferenceable.set("endTime", System.currentTimeMillis()); + + //TODO set + processReferenceable.set("queryGraph", "queryGraph"); + dgiBridge.createInstance(processReferenceable); + } else { + LOG.debug("Query {} is already registered", queryStr); + } + } + + + private JSONObject getQueryPlan(HiveEvent event) throws Exception { + try { + ExplainTask explain = new ExplainTask(); + explain.initialize(event.conf, event.queryPlan, null); + List<Task<?>> rootTasks = event.queryPlan.getRootTasks(); + return explain.getJSONPlan(null, null, rootTasks, event.queryPlan.getFetchTask(), true, false, false); + } catch(Exception e) { + LOG.warn("Failed to get queryplan", e); + return new JSONObject(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java new file mode 100755 index 0000000..52e07fb --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java @@ -0,0 +1,482 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.model; + +import com.google.common.collect.ImmutableList; +import org.apache.atlas.MetadataException; +import org.apache.atlas.MetadataServiceClient; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.json.TypesSerialization; +import org.apache.atlas.typesystem.types.AttributeDefinition; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.EnumType; +import org.apache.atlas.typesystem.types.EnumTypeDefinition; +import org.apache.atlas.typesystem.types.EnumValue; +import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.StructType; +import org.apache.atlas.typesystem.types.StructTypeDefinition; +import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.TypeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Utility that generates hive data model for both metastore entities and DDL/DML queries. + */ +public class HiveDataModelGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(HiveDataModelGenerator.class); + + private static final DataTypes.MapType STRING_MAP_TYPE = + new DataTypes.MapType(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE); + + private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions; + private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap; + private final Map<String, StructTypeDefinition> structTypeDefinitionMap; + + public static final String COMMENT = "comment"; + + public static final String STORAGE_NUM_BUCKETS = "numBuckets"; + public static final String STORAGE_IS_STORED_AS_SUB_DIRS = "storedAsSubDirectories"; + + public static final String NAME = "name"; + public static final String TABLE_NAME = "tableName"; + public static final String CLUSTER_NAME = "clusterName"; + public static final String TABLE = "table"; + public static final String DB = "db"; + + public HiveDataModelGenerator() { + classTypeDefinitions = new HashMap<>(); + enumTypeDefinitionMap = new HashMap<>(); + structTypeDefinitionMap = new HashMap<>(); + } + + public void createDataModel() throws MetadataException { + LOG.info("Generating the Hive Data Model...."); + + // enums + createHiveObjectTypeEnum(); + createHivePrincipalTypeEnum(); + createResourceTypeEnum(); + + // structs + createSerDeStruct(); + //createSkewedInfoStruct(); + createOrderStruct(); + createResourceUriStruct(); + createStorageDescClass(); + + // classes + createDBClass(); + createTypeClass(); + createColumnClass(); + createPartitionClass(); + createTableClass(); + createIndexClass(); + createRoleClass(); + + // DDL/DML Process + createProcessClass(); + } + + public TypesDef getTypesDef() { + return TypeUtils.getTypesDef( + getEnumTypeDefinitions(), + getStructTypeDefinitions(), + getTraitTypeDefinitions(), + getClassTypeDefinitions() + ); + } + + public String getDataModelAsJSON() { + return TypesSerialization.toJson(getTypesDef()); + } + + public ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() { + return ImmutableList.copyOf(enumTypeDefinitionMap.values()); + } + + public ImmutableList<StructTypeDefinition> getStructTypeDefinitions() { + return ImmutableList.copyOf(structTypeDefinitionMap.values()); + } + + public ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() { + return ImmutableList.copyOf(classTypeDefinitions.values()); + } + + public ImmutableList<HierarchicalTypeDefinition<TraitType>> getTraitTypeDefinitions() { + return ImmutableList.of(); + } + + private void createHiveObjectTypeEnum() throws MetadataException { + EnumValue values[] = { + new EnumValue("GLOBAL", 1), + new EnumValue("DATABASE", 2), + new EnumValue("TABLE", 3), + new EnumValue("PARTITION", 4), + new EnumValue("COLUMN", 5), + }; + + EnumTypeDefinition definition = new EnumTypeDefinition( + HiveDataTypes.HIVE_OBJECT_TYPE.getName(), values); + enumTypeDefinitionMap.put(HiveDataTypes.HIVE_OBJECT_TYPE.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_OBJECT_TYPE.getName()); + } + + private void createHivePrincipalTypeEnum() throws MetadataException { + EnumValue values[] = { + new EnumValue("USER", 1), + new EnumValue("ROLE", 2), + new EnumValue("GROUP", 3), + }; + + EnumTypeDefinition definition = new EnumTypeDefinition( + HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName(), values); + + enumTypeDefinitionMap.put(HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName()); + } + + private void createResourceTypeEnum() throws MetadataException { + EnumValue values[] = { + new EnumValue("JAR", 1), + new EnumValue("FILE", 2), + new EnumValue("ARCHIVE", 3), + }; + EnumTypeDefinition definition = new EnumTypeDefinition( + HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), values); + enumTypeDefinitionMap.put(HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_RESOURCE_TYPE.getName()); + } + + private void createSerDeStruct() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("serializationLib", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + }; + StructTypeDefinition definition = new StructTypeDefinition( + HiveDataTypes.HIVE_SERDE.getName(), attributeDefinitions); + structTypeDefinitionMap.put(HiveDataTypes.HIVE_SERDE.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_SERDE.getName()); + } + + private void createOrderStruct() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition("col", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("order", DataTypes.INT_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + }; + + StructTypeDefinition definition = new StructTypeDefinition( + HiveDataTypes.HIVE_ORDER.getName(), attributeDefinitions); + structTypeDefinitionMap.put(HiveDataTypes.HIVE_ORDER.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_ORDER.getName()); + } + + private void createStorageDescClass() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition("cols", + String.format("array<%s>", HiveDataTypes.HIVE_COLUMN.getName()), + Multiplicity.COLLECTION, false, null), + new AttributeDefinition("location", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("inputFormat", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("outputFormat", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("compressed", DataTypes.BOOLEAN_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition(STORAGE_NUM_BUCKETS, DataTypes.INT_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("serdeInfo", HiveDataTypes.HIVE_SERDE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("bucketCols", + String.format("array<%s>", DataTypes.STRING_TYPE.getName()), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("sortCols", + String.format("array<%s>", HiveDataTypes.HIVE_ORDER.getName()), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + //new AttributeDefinition("skewedInfo", DefinedTypes.HIVE_SKEWEDINFO.getName(), + // Multiplicity.OPTIONAL, false, null), + new AttributeDefinition(STORAGE_IS_STORED_AS_SUB_DIRS, DataTypes.BOOLEAN_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + }; + + HierarchicalTypeDefinition<ClassType> definition = new HierarchicalTypeDefinition<>( + ClassType.class, HiveDataTypes.HIVE_STORAGEDESC.getName(), null, attributeDefinitions); + classTypeDefinitions.put(HiveDataTypes.HIVE_STORAGEDESC.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_STORAGEDESC.getName()); + } + + /** Revisit later after nested array types are handled by the typesystem **/ + + private void createResourceUriStruct() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition("resourceType", HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("uri", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + }; + StructTypeDefinition definition = new StructTypeDefinition( + HiveDataTypes.HIVE_RESOURCEURI.getName(), attributeDefinitions); + structTypeDefinitionMap.put(HiveDataTypes.HIVE_RESOURCEURI.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_RESOURCEURI.getName()); + } + + private void createDBClass() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition(CLUSTER_NAME, DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("ownerName", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("ownerType", HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + }; + + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_DB.getName(), + null, attributeDefinitions); + classTypeDefinitions.put(HiveDataTypes.HIVE_DB.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_DB.getName()); + } + + private void createTypeClass() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("type1", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("type2", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("fields", String.format("array<%s>", + HiveDataTypes.HIVE_COLUMN.getName()), Multiplicity.OPTIONAL, false, null), + }; + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TYPE.getName(), + null, attributeDefinitions); + + classTypeDefinitions.put(HiveDataTypes.HIVE_TYPE.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_TYPE.getName()); + } + + private void createColumnClass() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("type", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition(COMMENT, DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + }; + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>( + ClassType.class, HiveDataTypes.HIVE_COLUMN.getName(), + null, attributeDefinitions); + classTypeDefinitions.put(HiveDataTypes.HIVE_COLUMN.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_COLUMN.getName()); + } + + private void createPartitionClass() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition(TABLE, HiveDataTypes.HIVE_TABLE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("columns", + DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()), + Multiplicity.OPTIONAL, true, null), + new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + }; + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, + HiveDataTypes.HIVE_PARTITION.getName(), null, attributeDefinitions); + classTypeDefinitions.put(HiveDataTypes.HIVE_PARTITION.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_PARTITION.getName()); + } + + private void createTableClass() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(TABLE_NAME, DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition(DB, HiveDataTypes.HIVE_DB.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("owner", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition(COMMENT, DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("retention", DataTypes.INT_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("partitionKeys", + DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("columns", + DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()), + Multiplicity.OPTIONAL, true, null), + new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("viewOriginalText", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("viewExpandedText", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("tableType", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("temporary", DataTypes.BOOLEAN_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + }; + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TABLE.getName(), + ImmutableList.of("DataSet"), attributeDefinitions); + classTypeDefinitions.put(HiveDataTypes.HIVE_TABLE.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_TABLE.getName()); + } + + private void createIndexClass() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("indexHandlerClass", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition(DB, HiveDataTypes.HIVE_DB.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("origTable", HiveDataTypes.HIVE_TABLE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("indexTable", HiveDataTypes.HIVE_TABLE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition("deferredRebuild", DataTypes.BOOLEAN_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + }; + + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_INDEX.getName(), + ImmutableList.of(MetadataServiceClient.DATA_SET_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(HiveDataTypes.HIVE_INDEX.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_INDEX.getName()); + } + + private void createRoleClass() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition("roleName", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("ownerName", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + }; + HierarchicalTypeDefinition<ClassType> definition = new HierarchicalTypeDefinition<>( + ClassType.class, HiveDataTypes.HIVE_ROLE.getName(), null, attributeDefinitions); + + classTypeDefinitions.put(HiveDataTypes.HIVE_ROLE.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_ROLE.getName()); + } + + private void createProcessClass() throws MetadataException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition("startTime", DataTypes.LONG_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("endTime", DataTypes.LONG_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("userName", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("queryText", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("queryPlan", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("queryId", DataTypes.STRING_TYPE.getName(), + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("queryGraph", DataTypes.STRING_TYPE.getName(), + Multiplicity.OPTIONAL, false, null), + }; + + HierarchicalTypeDefinition<ClassType> definition = new HierarchicalTypeDefinition<>( + ClassType.class, HiveDataTypes.HIVE_PROCESS.getName(), + ImmutableList.of(MetadataServiceClient.PROCESS_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(HiveDataTypes.HIVE_PROCESS.getName(), definition); + LOG.debug("Created definition for " + HiveDataTypes.HIVE_PROCESS.getName()); + } + + public String getModelAsJson() throws MetadataException { + createDataModel(); + return getDataModelAsJSON(); + } + + public static void main(String[] args) throws Exception { + HiveDataModelGenerator hiveDataModelGenerator = new HiveDataModelGenerator(); + System.out.println("hiveDataModelAsJSON = " + hiveDataModelGenerator.getModelAsJson()); + + TypesDef typesDef = hiveDataModelGenerator.getTypesDef(); + for (EnumTypeDefinition enumType : typesDef.enumTypesAsJavaList()) { + System.out.println(String.format("%s(%s) - %s", enumType.name, EnumType.class.getSimpleName(), + Arrays.toString(enumType.enumValues))); + } + for (StructTypeDefinition structType : typesDef.structTypesAsJavaList()) { + System.out.println(String.format("%s(%s) - %s", structType.typeName, StructType.class.getSimpleName(), + Arrays.toString(structType.attributeDefinitions))); + } + for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) { + System.out.println(String.format("%s(%s) - %s", classType.typeName, ClassType.class.getSimpleName(), + Arrays.toString(classType.attributeDefinitions))); + } + for (HierarchicalTypeDefinition<TraitType> traitType : typesDef.traitTypesAsJavaList()) { + System.out.println(String.format("%s(%s) - %s", traitType.typeName, TraitType.class.getSimpleName(), + Arrays.toString(traitType.attributeDefinitions))); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java new file mode 100755 index 0000000..f376e6e --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.model; + +/** + * Hive Data Types for model and bridge. + */ +public enum HiveDataTypes { + + // Enums + HIVE_OBJECT_TYPE, + HIVE_PRINCIPAL_TYPE, + HIVE_RESOURCE_TYPE, + + // Structs + HIVE_SERDE, + HIVE_ORDER, + HIVE_RESOURCEURI, + + // Classes + HIVE_DB, + HIVE_STORAGEDESC, + HIVE_TABLE, + HIVE_COLUMN, + HIVE_PARTITION, + HIVE_INDEX, + HIVE_ROLE, + HIVE_TYPE, + HIVE_PROCESS, + // HIVE_VIEW, + ; + + public String getName() { + return name().toLowerCase(); + } +}
