http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/bridge/HiveMetaStoreBridge.java deleted file mode 100755 index 2a341f4..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/bridge/HiveMetaStoreBridge.java +++ /dev/null @@ -1,512 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.hive.bridge; - -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.metadata.MetadataServiceClient; -import org.apache.hadoop.metadata.MetadataServiceException; -import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator; -import org.apache.hadoop.metadata.hive.model.HiveDataTypes; -import org.apache.hadoop.metadata.typesystem.Referenceable; -import org.apache.hadoop.metadata.typesystem.Struct; -import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization; -import org.apache.hadoop.metadata.typesystem.persistence.Id; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -/** - * A Bridge Utility that imports metadata from the Hive Meta Store - * and registers then in DGI. - */ -public class HiveMetaStoreBridge { - private static final String DEFAULT_DGI_URL = "http://localhost:21000/"; - public static final String HIVE_CLUSTER_NAME = "hive.cluster.name"; - public static final String DEFAULT_CLUSTER_NAME = "primary"; - private final String clusterName; - - public static final String DGI_URL_PROPERTY = "hive.hook.dgi.url"; - - private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class); - - private final Hive hiveClient; - private final MetadataServiceClient metadataServiceClient; - - /** - * Construct a HiveMetaStoreBridge. - * @param hiveConf hive conf - */ - public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception { - clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); - hiveClient = Hive.get(hiveConf); - metadataServiceClient = new MetadataServiceClient(hiveConf.get(DGI_URL_PROPERTY, DEFAULT_DGI_URL)); - } - - public MetadataServiceClient getMetadataServiceClient() { - return metadataServiceClient; - } - - public void importHiveMetadata() throws Exception { - LOG.info("Importing hive metadata"); - importDatabases(); - } - - private void importDatabases() throws Exception { - List<String> databases = hiveClient.getAllDatabases(); - for (String databaseName : databases) { - Referenceable dbReference = registerDatabase(databaseName); - - importTables(databaseName, dbReference); - } - } - - public Referenceable registerDatabase(String databaseName) throws Exception { - Referenceable dbRef = getDatabaseReference(databaseName, clusterName); - if (dbRef == null) { - LOG.info("Importing objects from databaseName : " + databaseName); - Database hiveDB = hiveClient.getDatabase(databaseName); - - dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); - dbRef.set(HiveDataModelGenerator.NAME, hiveDB.getName().toLowerCase()); - dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName); - dbRef.set("description", hiveDB.getDescription()); - dbRef.set("locationUri", hiveDB.getLocationUri()); - dbRef.set("parameters", hiveDB.getParameters()); - dbRef.set("ownerName", hiveDB.getOwnerName()); - if (hiveDB.getOwnerType() != null) { - dbRef.set("ownerType", hiveDB.getOwnerType().getValue()); - } - - dbRef = createInstance(dbRef); - } else { - LOG.info("Database {} is already registered with id {}", databaseName, dbRef.getId().id); - } - return dbRef; - } - - public Referenceable createInstance(Referenceable referenceable) throws Exception { - String typeName = referenceable.getTypeName(); - LOG.debug("creating instance of type " + typeName); - - String entityJSON = InstanceSerialization.toJson(referenceable, true); - LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON); - JSONObject jsonObject = metadataServiceClient.createEntity(entityJSON); - String guid = jsonObject.getString(MetadataServiceClient.GUID); - LOG.debug("created instance for type " + typeName + ", guid: " + guid); - - return new Referenceable(guid, referenceable.getTypeName(), null); - } - - private void importTables(String databaseName, Referenceable databaseReferenceable) throws Exception { - List<String> hiveTables = hiveClient.getAllTables(databaseName); - - for (String tableName : hiveTables) { - Referenceable tableReferenceable = registerTable(databaseReferenceable, databaseName, tableName); - - // Import Partitions - Referenceable sdReferenceable = getSDForTable(databaseName, tableName); - registerPartitions(databaseName, tableName, tableReferenceable, sdReferenceable); - - // Import Indexes - importIndexes(databaseName, tableName, databaseReferenceable, tableReferenceable); - } - } - - /** - * Gets reference for the database - * - * - * @param databaseName database Name - * @param clusterName cluster name - * @return Reference for database if exists, else null - * @throws Exception - */ - private Referenceable getDatabaseReference(String databaseName, String clusterName) throws Exception { - LOG.debug("Getting reference for database {}", databaseName); - String typeName = HiveDataTypes.HIVE_DB.getName(); - - String dslQuery = String.format("%s where %s = '%s' and %s = '%s'", typeName, - HiveDataModelGenerator.NAME, databaseName.toLowerCase(), HiveDataModelGenerator.CLUSTER_NAME, - clusterName); - return getEntityReferenceFromDSL(typeName, dslQuery); - } - - public Referenceable getProcessReference(String queryStr) throws Exception { - LOG.debug("Getting reference for process with query {}", queryStr); - String typeName = HiveDataTypes.HIVE_PROCESS.getName(); - - //todo enable DSL -// String dslQuery = String.format("%s where queryText = \"%s\"", typeName, queryStr); -// return getEntityReferenceFromDSL(typeName, dslQuery); - - String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", - typeName, typeName, StringEscapeUtils.escapeJava(queryStr)); - return getEntityReferenceFromGremlin(typeName, gremlinQuery); - } - - private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception { - MetadataServiceClient dgiClient = getMetadataServiceClient(); - JSONArray results = dgiClient.searchByDSL(dslQuery); - if (results.length() == 0) { - return null; - } else { - String guid; - JSONObject row = results.getJSONObject(0); - if (row.has("$id$")) { - guid = row.getJSONObject("$id$").getString("id"); - } else { - guid = row.getJSONObject("_col_0").getString("id"); - } - return new Referenceable(guid, typeName, null); - } - } - - public static String getTableName(String clusterName, String dbName, String tableName) { - return String.format("%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), clusterName); - } - - /** - * Gets reference for the table - * - * @param dbName database name - * @param tableName table name - * @return table reference if exists, else null - * @throws Exception - */ - private Referenceable getTableReference(String dbName, String tableName) throws Exception { - LOG.debug("Getting reference for table {}.{}", dbName, tableName); - - String typeName = HiveDataTypes.HIVE_TABLE.getName(); - String entityName = getTableName(clusterName, dbName, tableName); - String dslQuery = String.format("%s as t where name = '%s'", typeName, entityName); - return getEntityReferenceFromDSL(typeName, dslQuery); - } - - private Referenceable getEntityReferenceFromGremlin(String typeName, String gremlinQuery) throws MetadataServiceException, - JSONException { - MetadataServiceClient client = getMetadataServiceClient(); - JSONObject response = client.searchByGremlin(gremlinQuery); - JSONArray results = response.getJSONArray(MetadataServiceClient.RESULTS); - if (results.length() == 0) { - return null; - } - String guid = results.getJSONObject(0).getString("__guid"); - return new Referenceable(guid, typeName, null); - } - - private Referenceable getPartitionReference(String dbName, String tableName, List<String> values) throws Exception { - String valuesStr = "['" + StringUtils.join(values, "', '") + "']"; - LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr); - String typeName = HiveDataTypes.HIVE_PARTITION.getName(); - - //todo replace gremlin with DSL - // String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', " - // + "dbName where name = '%s' and clusterName = '%s' select p", typeName, valuesStr, tableName, - // dbName, clusterName); - - String datasetType = MetadataServiceClient.DATA_SET_SUPER_TYPE; - String tableEntityName = getTableName(clusterName, dbName, tableName); - - String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')." - + "out('__%s.table').has('%s.name', '%s').back('p').toList()", typeName, typeName, valuesStr, - typeName, datasetType, tableEntityName); - - return getEntityReferenceFromGremlin(typeName, gremlinQuery); - } - - private Referenceable getSDForTable(String dbName, String tableName) throws Exception { - Referenceable tableRef = getTableReference(dbName, tableName); - if (tableRef == null) { - throw new IllegalArgumentException("Table " + dbName + "." + tableName + " doesn't exist"); - } - - MetadataServiceClient dgiClient = getMetadataServiceClient(); - Referenceable tableInstance = dgiClient.getEntity(tableRef.getId().id); - Id sdId = (Id) tableInstance.get("sd"); - return new Referenceable(sdId.id, sdId.getTypeName(), null); - } - - public Referenceable registerTable(String dbName, String tableName) throws Exception { - Referenceable dbReferenceable = registerDatabase(dbName); - return registerTable(dbReferenceable, dbName, tableName); - } - - public Referenceable registerTable(Referenceable dbReference, String dbName, String tableName) throws Exception { - LOG.info("Attempting to register table [" + tableName + "]"); - Referenceable tableRef = getTableReference(dbName, tableName); - if (tableRef == null) { - LOG.info("Importing objects from " + dbName + "." + tableName); - - Table hiveTable = hiveClient.getTable(dbName, tableName); - - tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); - tableRef.set(HiveDataModelGenerator.NAME, - getTableName(clusterName, hiveTable.getDbName(), hiveTable.getTableName())); - tableRef.set(HiveDataModelGenerator.TABLE_NAME, hiveTable.getTableName().toLowerCase()); - tableRef.set("owner", hiveTable.getOwner()); - - tableRef.set("createTime", hiveTable.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)); - tableRef.set("lastAccessTime", hiveTable.getLastAccessTime()); - tableRef.set("retention", hiveTable.getRetention()); - - tableRef.set(HiveDataModelGenerator.COMMENT, hiveTable.getParameters().get(HiveDataModelGenerator.COMMENT)); - - // add reference to the database - tableRef.set(HiveDataModelGenerator.DB, dbReference); - - List<Referenceable> colList = getColumns(hiveTable.getCols()); - tableRef.set("columns", colList); - - // add reference to the StorageDescriptor - StorageDescriptor storageDesc = hiveTable.getSd(); - Referenceable sdReferenceable = fillStorageDescStruct(storageDesc, colList); - tableRef.set("sd", sdReferenceable); - - // add reference to the Partition Keys - List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys()); - tableRef.set("partitionKeys", partKeys); - - tableRef.set("parameters", hiveTable.getParameters()); - - if (hiveTable.getViewOriginalText() != null) { - tableRef.set("viewOriginalText", hiveTable.getViewOriginalText()); - } - - if (hiveTable.getViewExpandedText() != null) { - tableRef.set("viewExpandedText", hiveTable.getViewExpandedText()); - } - - tableRef.set("tableType", hiveTable.getTableType().name()); - tableRef.set("temporary", hiveTable.isTemporary()); - - - tableRef = createInstance(tableRef); - } else { - LOG.info("Table {}.{} is already registered with id {}", dbName, tableName, tableRef.getId().id); - } - return tableRef; - } - - private void registerPartitions(String db, String tableName, Referenceable tableReferenceable, - Referenceable sdReferenceable) throws Exception { - Set<Partition> tableParts = hiveClient.getAllPartitionsOf(new Table(Table.getEmptyTable(db, tableName))); - - if (tableParts.size() > 0) { - for (Partition hivePart : tableParts) { - registerPartition(hivePart, tableReferenceable, sdReferenceable); - } - } - } - - public Referenceable registerPartition(Partition partition) throws Exception { - String dbName = partition.getTable().getDbName(); - String tableName = partition.getTable().getTableName(); - Referenceable tableRef = registerTable(dbName, tableName); - Referenceable sdRef = getSDForTable(dbName, tableName); - return registerPartition(partition, tableRef, sdRef); - } - - private Referenceable registerPartition(Partition hivePart, Referenceable tableReferenceable, - Referenceable sdReferenceable) throws Exception { - LOG.info("Registering partition for {} with values {}", tableReferenceable, - StringUtils.join(hivePart.getValues(), ",")); - String dbName = hivePart.getTable().getDbName(); - String tableName = hivePart.getTable().getTableName(); - - Referenceable partRef = getPartitionReference(dbName, tableName, hivePart.getValues()); - if (partRef == null) { - partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName()); - partRef.set("values", hivePart.getValues()); - - partRef.set(HiveDataModelGenerator.TABLE, tableReferenceable); - - //todo fix - partRef.set("createTime", hivePart.getLastAccessTime()); - partRef.set("lastAccessTime", hivePart.getLastAccessTime()); - - // sdStruct = fillStorageDescStruct(hivePart.getSd()); - // Instead of creating copies of the sdstruct for partitions we are reusing existing - // ones will fix to identify partitions with differing schema. - partRef.set("sd", sdReferenceable); - - partRef.set("parameters", hivePart.getParameters()); - partRef = createInstance(partRef); - } else { - LOG.info("Partition {}.{} with values {} is already registered with id {}", dbName, tableName, - StringUtils.join(hivePart.getValues(), ","), partRef.getId().id); - } - return partRef; - } - - private void importIndexes(String db, String table, - Referenceable dbReferenceable, - Referenceable tableReferenceable) throws Exception { - List<Index> indexes = hiveClient.getIndexes(db, table, Short.MAX_VALUE); - if (indexes.size() > 0) { - for (Index index : indexes) { - importIndex(index, dbReferenceable, tableReferenceable); - } - } - } - - //todo should be idempotent - private void importIndex(Index index, - Referenceable dbReferenceable, - Referenceable tableReferenceable) throws Exception { - LOG.info("Importing index {} for {}.{}", index.getIndexName(), dbReferenceable, tableReferenceable); - Referenceable indexRef = new Referenceable(HiveDataTypes.HIVE_INDEX.getName()); - - indexRef.set(HiveDataModelGenerator.NAME, index.getIndexName()); - indexRef.set("indexHandlerClass", index.getIndexHandlerClass()); - - indexRef.set(HiveDataModelGenerator.DB, dbReferenceable); - - indexRef.set("createTime", index.getCreateTime()); - indexRef.set("lastAccessTime", index.getLastAccessTime()); - indexRef.set("origTable", index.getOrigTableName()); - indexRef.set("indexTable", index.getIndexTableName()); - - Referenceable sdReferenceable = fillStorageDescStruct(index.getSd(), null); - indexRef.set("sd", sdReferenceable); - - indexRef.set("parameters", index.getParameters()); - - tableReferenceable.set("deferredRebuild", index.isDeferredRebuild()); - - createInstance(indexRef); - } - - private Referenceable fillStorageDescStruct(StorageDescriptor storageDesc, List<Referenceable> colList) throws Exception { - LOG.debug("Filling storage descriptor information for " + storageDesc); - - Referenceable sdReferenceable = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName()); - - SerDeInfo serdeInfo = storageDesc.getSerdeInfo(); - LOG.debug("serdeInfo = " + serdeInfo); - // SkewedInfo skewedInfo = storageDesc.getSkewedInfo(); - - String serdeInfoName = HiveDataTypes.HIVE_SERDE.getName(); - Struct serdeInfoStruct = new Struct(serdeInfoName); - - serdeInfoStruct.set(HiveDataModelGenerator.NAME, serdeInfo.getName()); - serdeInfoStruct.set("serializationLib", serdeInfo.getSerializationLib()); - serdeInfoStruct.set("parameters", serdeInfo.getParameters()); - - sdReferenceable.set("serdeInfo", serdeInfoStruct); - sdReferenceable.set(HiveDataModelGenerator.STORAGE_NUM_BUCKETS, storageDesc.getNumBuckets()); - sdReferenceable.set(HiveDataModelGenerator.STORAGE_IS_STORED_AS_SUB_DIRS, storageDesc.isStoredAsSubDirectories()); - - //Use the passed column list if not null, ex: use same references for table and SD - List<FieldSchema> columns = storageDesc.getCols(); - if (columns != null && !columns.isEmpty()) { - if (colList != null) { - sdReferenceable.set("cols", colList); - } else { - sdReferenceable.set("cols", getColumns(columns)); - } - } - - List<Struct> sortColsStruct = new ArrayList<>(); - for (Order sortcol : storageDesc.getSortCols()) { - String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName(); - Struct colStruct = new Struct(hiveOrderName); - colStruct.set("col", sortcol.getCol()); - colStruct.set("order", sortcol.getOrder()); - - sortColsStruct.add(colStruct); - } - if (sortColsStruct.size() > 0) { - sdReferenceable.set("sortCols", sortColsStruct); - } - - sdReferenceable.set("location", storageDesc.getLocation()); - sdReferenceable.set("inputFormat", storageDesc.getInputFormat()); - sdReferenceable.set("outputFormat", storageDesc.getOutputFormat()); - sdReferenceable.set("compressed", storageDesc.isCompressed()); - - if (storageDesc.getBucketCols().size() > 0) { - sdReferenceable.set("bucketCols", storageDesc.getBucketCols()); - } - - sdReferenceable.set("parameters", storageDesc.getParameters()); - sdReferenceable.set("storedAsSubDirectories", storageDesc.isStoredAsSubDirectories()); - - return createInstance(sdReferenceable); - } - - private List<Referenceable> getColumns(List<FieldSchema> schemaList) throws Exception - { - List<Referenceable> colList = new ArrayList<>(); - for (FieldSchema fs : schemaList) { - LOG.debug("Processing field " + fs); - Referenceable colReferenceable = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName()); - colReferenceable.set(HiveDataModelGenerator.NAME, fs.getName()); - colReferenceable.set("type", fs.getType()); - colReferenceable.set(HiveDataModelGenerator.COMMENT, fs.getComment()); - - colList.add(createInstance(colReferenceable)); - } - return colList; - } - - public synchronized void registerHiveDataModel() throws Exception { - HiveDataModelGenerator dataModelGenerator = new HiveDataModelGenerator(); - MetadataServiceClient dgiClient = getMetadataServiceClient(); - - //Register hive data model if its not already registered - if (dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName()) == null ) { - LOG.info("Registering Hive data model"); - dgiClient.createType(dataModelGenerator.getModelAsJson()); - } else { - LOG.info("Hive data model is already registered!"); - } - } - - public static void main(String[] argv) throws Exception { - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf()); - hiveMetaStoreBridge.registerHiveDataModel(); - hiveMetaStoreBridge.importHiveMetadata(); - } - - public void updateTable(Referenceable tableReferenceable, Table newTable) throws MetadataServiceException { - MetadataServiceClient client = getMetadataServiceClient(); - client.updateEntity(tableReferenceable.getId()._getId(), HiveDataModelGenerator.TABLE_NAME, - newTable.getTableName().toLowerCase()); - client.updateEntity(tableReferenceable.getId()._getId(), HiveDataModelGenerator.NAME, - getTableName(clusterName, newTable.getDbName(), newTable.getTableName())); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/hook/HiveHook.java deleted file mode 100755 index abc8ab7..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/hook/HiveHook.java +++ /dev/null @@ -1,339 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.hive.hook; - - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.exec.ExplainTask; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.hooks.Entity; -import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; -import org.apache.hadoop.hive.ql.hooks.HookContext; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.plan.HiveOperation; -import org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge; -import org.apache.hadoop.metadata.hive.model.HiveDataTypes; -import org.apache.hadoop.metadata.typesystem.Referenceable; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * DgiHook sends lineage information to the DgiSever. - */ -public class HiveHook implements ExecuteWithHookContext { - - private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class); - - // wait time determines how long we wait before we exit the jvm on - // shutdown. Pending requests after that will not be sent. - private static final int WAIT_TIME = 3; - private static ExecutorService executor; - - private static final String MIN_THREADS = "hive.hook.dgi.minThreads"; - private static final String MAX_THREADS = "hive.hook.dgi.maxThreads"; - private static final String KEEP_ALIVE_TIME = "hive.hook.dgi.keepAliveTime"; - - private static final int minThreadsDefault = 5; - private static final int maxThreadsDefault = 5; - private static final long keepAliveTimeDefault = 10; - private static boolean typesRegistered = false; - - static { - // anything shared should be initialized here and destroyed in the - // shutdown hook The hook contract is weird in that it creates a - // boatload of hooks. - - // initialize the async facility to process hook calls. We don't - // want to do this inline since it adds plenty of overhead for the - // query. - HiveConf hiveConf = new HiveConf(); - int minThreads = hiveConf.getInt(MIN_THREADS, minThreadsDefault); - int maxThreads = hiveConf.getInt(MAX_THREADS, maxThreadsDefault); - long keepAliveTime = hiveConf.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault); - - executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DGI Logger %d").build()); - - try { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - executor.shutdown(); - executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); - executor = null; - } catch (InterruptedException ie) { - LOG.info("Interrupt received in shutdown."); - } - // shutdown client - } - }); - } catch (IllegalStateException is) { - LOG.info("Attempting to send msg while shutdown in progress."); - } - - LOG.info("Created DGI Hook"); - } - - class HiveEvent { - public HiveConf conf; - - public Set<ReadEntity> inputs; - public Set<WriteEntity> outputs; - - public String user; - public HiveOperation operation; - public QueryPlan queryPlan; - public HookContext.HookType hookType; - public JSONObject jsonPlan; - } - - @Override - public void run(final HookContext hookContext) throws Exception { - if (executor == null) { - LOG.info("No executor running. Bail."); - return; - } - - // clone to avoid concurrent access - final HiveEvent event = new HiveEvent(); - final HiveConf conf = new HiveConf(hookContext.getConf()); - boolean debug = conf.get("hive.hook.dgi.synchronous", "false").equals("true"); - - event.conf = conf; - event.inputs = hookContext.getInputs(); - event.outputs = hookContext.getOutputs(); - - event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName(); - event.operation = HiveOperation.valueOf(hookContext.getOperationName()); - event.queryPlan = hookContext.getQueryPlan(); - event.hookType = hookContext.getHookType(); - - event.jsonPlan = getQueryPlan(event); - - if (debug) { - fireAndForget(event); - } else { - executor.submit(new Runnable() { - @Override - public void run() { - try { - fireAndForget(event); - } catch (Throwable e) { - LOG.info("DGI hook failed", e); - } - } - }); - } - } - - private void fireAndForget(HiveEvent event) throws Exception { - assert event.hookType == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!"; - - LOG.info("Entered DGI hook for hook type {} operation {}", event.hookType, event.operation); - HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(event.conf); - - if (!typesRegistered) { - dgiBridge.registerHiveDataModel(); - typesRegistered = true; - } - - switch (event.operation) { - case CREATEDATABASE: - handleCreateDB(dgiBridge, event); - break; - - case CREATETABLE: - handleCreateTable(dgiBridge, event); - break; - - case CREATETABLE_AS_SELECT: - case CREATEVIEW: - case LOAD: - case EXPORT: - case IMPORT: - case QUERY: - registerProcess(dgiBridge, event); - break; - - case ALTERTABLE_RENAME: - case ALTERVIEW_RENAME: - renameTable(dgiBridge, event); - break; - - case ALTERVIEW_AS: - //update inputs/outputs? - break; - - case ALTERTABLE_ADDCOLS: - case ALTERTABLE_REPLACECOLS: - case ALTERTABLE_RENAMECOL: - break; - - default: - } - } - - private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { - //crappy, no easy of getting new name - assert event.inputs != null && event.inputs.size() == 1; - assert event.outputs != null && event.outputs.size() > 0; - - Table oldTable = event.inputs.iterator().next().getTable(); - Table newTable = null; - for (WriteEntity writeEntity : event.outputs) { - if (writeEntity.getType() == Entity.Type.TABLE) { - Table table = writeEntity.getTable(); - if (table.getDbName().equals(oldTable.getDbName()) && !table.getTableName() - .equals(oldTable.getTableName())) { - newTable = table; - break; - } - } - } - if (newTable == null) { - LOG.warn("Failed to deduct new name for " + event.queryPlan.getQueryStr()); - return; - } - - Referenceable dbReferenceable = dgiBridge.registerDatabase(oldTable.getDbName()); - Referenceable tableReferenceable = - dgiBridge.registerTable(dbReferenceable, oldTable.getDbName(), oldTable.getTableName()); - LOG.info("Updating entity name {}.{} to {}", oldTable.getDbName(), oldTable.getTableName(), - newTable.getTableName()); - dgiBridge.updateTable(tableReferenceable, newTable); - } - - private void handleCreateTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { - for (WriteEntity entity : event.outputs) { - if (entity.getType() == Entity.Type.TABLE) { - - Table table = entity.getTable(); - Referenceable dbReferenceable = dgiBridge.registerDatabase(table.getDbName()); - dgiBridge.registerTable(dbReferenceable, table.getDbName(), table.getTableName()); - } - } - } - - private void handleCreateDB(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { - for (WriteEntity entity : event.outputs) { - if (entity.getType() == Entity.Type.DATABASE) { - dgiBridge.registerDatabase(entity.getDatabase().getName()); - } - } - } - - private String normalize(String str) { - if (StringUtils.isEmpty(str)) { - return null; - } - return str.toLowerCase().trim(); - } - - private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { - Set<ReadEntity> inputs = event.inputs; - Set<WriteEntity> outputs = event.outputs; - - //Even explain CTAS has operation name as CREATETABLE_AS_SELECT - if (inputs.isEmpty() && outputs.isEmpty()) { - LOG.info("Explain statement. Skipping..."); - } - - if (event.queryPlan == null) { - LOG.info("Query plan is missing. Skipping..."); - } - - String queryId = event.queryPlan.getQueryId(); - String queryStr = normalize(event.queryPlan.getQueryStr()); - long queryStartTime = event.queryPlan.getQueryStartTime(); - - LOG.debug("Registering CTAS query: {}", queryStr); - Referenceable processReferenceable = dgiBridge.getProcessReference(queryStr); - if (processReferenceable == null) { - processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); - processReferenceable.set("name", event.operation.getOperationName()); - processReferenceable.set("startTime", queryStartTime); - processReferenceable.set("userName", event.user); - - List<Referenceable> source = new ArrayList<>(); - for (ReadEntity readEntity : inputs) { - if (readEntity.getType() == Entity.Type.TABLE) { - Table table = readEntity.getTable(); - String dbName = table.getDbName(); - source.add(dgiBridge.registerTable(dbName, table.getTableName())); - } - if (readEntity.getType() == Entity.Type.PARTITION) { - dgiBridge.registerPartition(readEntity.getPartition()); - } - } - processReferenceable.set("inputs", source); - - List<Referenceable> target = new ArrayList<>(); - for (WriteEntity writeEntity : outputs) { - if (writeEntity.getType() == Entity.Type.TABLE || writeEntity.getType() == Entity.Type.PARTITION) { - Table table = writeEntity.getTable(); - String dbName = table.getDbName(); - target.add(dgiBridge.registerTable(dbName, table.getTableName())); - } - if (writeEntity.getType() == Entity.Type.PARTITION) { - dgiBridge.registerPartition(writeEntity.getPartition()); - } - } - processReferenceable.set("outputs", target); - processReferenceable.set("queryText", queryStr); - processReferenceable.set("queryId", queryId); - processReferenceable.set("queryPlan", event.jsonPlan.toString()); - processReferenceable.set("endTime", System.currentTimeMillis()); - - //TODO set - processReferenceable.set("queryGraph", "queryGraph"); - dgiBridge.createInstance(processReferenceable); - } else { - LOG.debug("Query {} is already registered", queryStr); - } - } - - - private JSONObject getQueryPlan(HiveEvent event) throws Exception { - try { - ExplainTask explain = new ExplainTask(); - explain.initialize(event.conf, event.queryPlan, null); - List<Task<?>> rootTasks = event.queryPlan.getRootTasks(); - return explain.getJSONPlan(null, null, rootTasks, event.queryPlan.getFetchTask(), true, false, false); - } catch(Exception e) { - LOG.warn("Failed to get queryplan", e); - return new JSONObject(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/model/HiveDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/model/HiveDataModelGenerator.java b/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/model/HiveDataModelGenerator.java deleted file mode 100755 index 96c7d55..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/model/HiveDataModelGenerator.java +++ /dev/null @@ -1,482 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.hive.model; - -import com.google.common.collect.ImmutableList; -import org.apache.hadoop.metadata.MetadataException; -import org.apache.hadoop.metadata.MetadataServiceClient; -import org.apache.hadoop.metadata.typesystem.TypesDef; -import org.apache.hadoop.metadata.typesystem.json.TypesSerialization; -import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition; -import org.apache.hadoop.metadata.typesystem.types.ClassType; -import org.apache.hadoop.metadata.typesystem.types.DataTypes; -import org.apache.hadoop.metadata.typesystem.types.EnumType; -import org.apache.hadoop.metadata.typesystem.types.EnumTypeDefinition; -import org.apache.hadoop.metadata.typesystem.types.EnumValue; -import org.apache.hadoop.metadata.typesystem.types.HierarchicalTypeDefinition; -import org.apache.hadoop.metadata.typesystem.types.Multiplicity; -import org.apache.hadoop.metadata.typesystem.types.StructType; -import org.apache.hadoop.metadata.typesystem.types.StructTypeDefinition; -import org.apache.hadoop.metadata.typesystem.types.TraitType; -import org.apache.hadoop.metadata.typesystem.types.TypeUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -/** - * Utility that generates hive data model for both metastore entities and DDL/DML queries. - */ -public class HiveDataModelGenerator { - - private static final Logger LOG = LoggerFactory.getLogger(HiveDataModelGenerator.class); - - private static final DataTypes.MapType STRING_MAP_TYPE = - new DataTypes.MapType(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE); - - private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions; - private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap; - private final Map<String, StructTypeDefinition> structTypeDefinitionMap; - - public static final String COMMENT = "comment"; - - public static final String STORAGE_NUM_BUCKETS = "numBuckets"; - public static final String STORAGE_IS_STORED_AS_SUB_DIRS = "storedAsSubDirectories"; - - public static final String NAME = "name"; - public static final String TABLE_NAME = "tableName"; - public static final String CLUSTER_NAME = "clusterName"; - public static final String TABLE = "table"; - public static final String DB = "db"; - - public HiveDataModelGenerator() { - classTypeDefinitions = new HashMap<>(); - enumTypeDefinitionMap = new HashMap<>(); - structTypeDefinitionMap = new HashMap<>(); - } - - public void createDataModel() throws MetadataException { - LOG.info("Generating the Hive Data Model...."); - - // enums - createHiveObjectTypeEnum(); - createHivePrincipalTypeEnum(); - createResourceTypeEnum(); - - // structs - createSerDeStruct(); - //createSkewedInfoStruct(); - createOrderStruct(); - createResourceUriStruct(); - createStorageDescClass(); - - // classes - createDBClass(); - createTypeClass(); - createColumnClass(); - createPartitionClass(); - createTableClass(); - createIndexClass(); - createRoleClass(); - - // DDL/DML Process - createProcessClass(); - } - - public TypesDef getTypesDef() { - return TypeUtils.getTypesDef( - getEnumTypeDefinitions(), - getStructTypeDefinitions(), - getTraitTypeDefinitions(), - getClassTypeDefinitions() - ); - } - - public String getDataModelAsJSON() { - return TypesSerialization.toJson(getTypesDef()); - } - - public ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() { - return ImmutableList.copyOf(enumTypeDefinitionMap.values()); - } - - public ImmutableList<StructTypeDefinition> getStructTypeDefinitions() { - return ImmutableList.copyOf(structTypeDefinitionMap.values()); - } - - public ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() { - return ImmutableList.copyOf(classTypeDefinitions.values()); - } - - public ImmutableList<HierarchicalTypeDefinition<TraitType>> getTraitTypeDefinitions() { - return ImmutableList.of(); - } - - private void createHiveObjectTypeEnum() throws MetadataException { - EnumValue values[] = { - new EnumValue("GLOBAL", 1), - new EnumValue("DATABASE", 2), - new EnumValue("TABLE", 3), - new EnumValue("PARTITION", 4), - new EnumValue("COLUMN", 5), - }; - - EnumTypeDefinition definition = new EnumTypeDefinition( - HiveDataTypes.HIVE_OBJECT_TYPE.getName(), values); - enumTypeDefinitionMap.put(HiveDataTypes.HIVE_OBJECT_TYPE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_OBJECT_TYPE.getName()); - } - - private void createHivePrincipalTypeEnum() throws MetadataException { - EnumValue values[] = { - new EnumValue("USER", 1), - new EnumValue("ROLE", 2), - new EnumValue("GROUP", 3), - }; - - EnumTypeDefinition definition = new EnumTypeDefinition( - HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName(), values); - - enumTypeDefinitionMap.put(HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName()); - } - - private void createResourceTypeEnum() throws MetadataException { - EnumValue values[] = { - new EnumValue("JAR", 1), - new EnumValue("FILE", 2), - new EnumValue("ARCHIVE", 3), - }; - EnumTypeDefinition definition = new EnumTypeDefinition( - HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), values); - enumTypeDefinitionMap.put(HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_RESOURCE_TYPE.getName()); - } - - private void createSerDeStruct() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("serializationLib", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - }; - StructTypeDefinition definition = new StructTypeDefinition( - HiveDataTypes.HIVE_SERDE.getName(), attributeDefinitions); - structTypeDefinitionMap.put(HiveDataTypes.HIVE_SERDE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_SERDE.getName()); - } - - private void createOrderStruct() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition("col", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("order", DataTypes.INT_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - }; - - StructTypeDefinition definition = new StructTypeDefinition( - HiveDataTypes.HIVE_ORDER.getName(), attributeDefinitions); - structTypeDefinitionMap.put(HiveDataTypes.HIVE_ORDER.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_ORDER.getName()); - } - - private void createStorageDescClass() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition("cols", - String.format("array<%s>", HiveDataTypes.HIVE_COLUMN.getName()), - Multiplicity.COLLECTION, false, null), - new AttributeDefinition("location", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("inputFormat", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("outputFormat", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("compressed", DataTypes.BOOLEAN_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition(STORAGE_NUM_BUCKETS, DataTypes.INT_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("serdeInfo", HiveDataTypes.HIVE_SERDE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("bucketCols", - String.format("array<%s>", DataTypes.STRING_TYPE.getName()), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("sortCols", - String.format("array<%s>", HiveDataTypes.HIVE_ORDER.getName()), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - //new AttributeDefinition("skewedInfo", DefinedTypes.HIVE_SKEWEDINFO.getName(), - // Multiplicity.OPTIONAL, false, null), - new AttributeDefinition(STORAGE_IS_STORED_AS_SUB_DIRS, DataTypes.BOOLEAN_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - }; - - HierarchicalTypeDefinition<ClassType> definition = new HierarchicalTypeDefinition<>( - ClassType.class, HiveDataTypes.HIVE_STORAGEDESC.getName(), null, attributeDefinitions); - classTypeDefinitions.put(HiveDataTypes.HIVE_STORAGEDESC.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_STORAGEDESC.getName()); - } - - /** Revisit later after nested array types are handled by the typesystem **/ - - private void createResourceUriStruct() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition("resourceType", HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("uri", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - }; - StructTypeDefinition definition = new StructTypeDefinition( - HiveDataTypes.HIVE_RESOURCEURI.getName(), attributeDefinitions); - structTypeDefinitionMap.put(HiveDataTypes.HIVE_RESOURCEURI.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_RESOURCEURI.getName()); - } - - private void createDBClass() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition(CLUSTER_NAME, DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("ownerName", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("ownerType", HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - }; - - HierarchicalTypeDefinition<ClassType> definition = - new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_DB.getName(), - null, attributeDefinitions); - classTypeDefinitions.put(HiveDataTypes.HIVE_DB.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_DB.getName()); - } - - private void createTypeClass() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("type1", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("type2", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("fields", String.format("array<%s>", - HiveDataTypes.HIVE_COLUMN.getName()), Multiplicity.OPTIONAL, false, null), - }; - HierarchicalTypeDefinition<ClassType> definition = - new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TYPE.getName(), - null, attributeDefinitions); - - classTypeDefinitions.put(HiveDataTypes.HIVE_TYPE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_TYPE.getName()); - } - - private void createColumnClass() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("type", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition(COMMENT, DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - }; - HierarchicalTypeDefinition<ClassType> definition = - new HierarchicalTypeDefinition<>( - ClassType.class, HiveDataTypes.HIVE_COLUMN.getName(), - null, attributeDefinitions); - classTypeDefinitions.put(HiveDataTypes.HIVE_COLUMN.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_COLUMN.getName()); - } - - private void createPartitionClass() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition(TABLE, HiveDataTypes.HIVE_TABLE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("columns", - DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()), - Multiplicity.OPTIONAL, true, null), - new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - }; - HierarchicalTypeDefinition<ClassType> definition = - new HierarchicalTypeDefinition<>(ClassType.class, - HiveDataTypes.HIVE_PARTITION.getName(), null, attributeDefinitions); - classTypeDefinitions.put(HiveDataTypes.HIVE_PARTITION.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_PARTITION.getName()); - } - - private void createTableClass() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(TABLE_NAME, DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition(DB, HiveDataTypes.HIVE_DB.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("owner", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition(COMMENT, DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("retention", DataTypes.INT_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("partitionKeys", - DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("columns", - DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()), - Multiplicity.OPTIONAL, true, null), - new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("viewOriginalText", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("viewExpandedText", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("tableType", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("temporary", DataTypes.BOOLEAN_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - }; - HierarchicalTypeDefinition<ClassType> definition = - new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TABLE.getName(), - ImmutableList.of("DataSet"), attributeDefinitions); - classTypeDefinitions.put(HiveDataTypes.HIVE_TABLE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_TABLE.getName()); - } - - private void createIndexClass() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("indexHandlerClass", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition(DB, HiveDataTypes.HIVE_DB.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("origTable", HiveDataTypes.HIVE_TABLE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("indexTable", HiveDataTypes.HIVE_TABLE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("deferredRebuild", DataTypes.BOOLEAN_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - }; - - HierarchicalTypeDefinition<ClassType> definition = - new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_INDEX.getName(), - ImmutableList.of(MetadataServiceClient.DATA_SET_SUPER_TYPE), attributeDefinitions); - classTypeDefinitions.put(HiveDataTypes.HIVE_INDEX.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_INDEX.getName()); - } - - private void createRoleClass() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition("roleName", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("ownerName", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - }; - HierarchicalTypeDefinition<ClassType> definition = new HierarchicalTypeDefinition<>( - ClassType.class, HiveDataTypes.HIVE_ROLE.getName(), null, attributeDefinitions); - - classTypeDefinitions.put(HiveDataTypes.HIVE_ROLE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_ROLE.getName()); - } - - private void createProcessClass() throws MetadataException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition("startTime", DataTypes.LONG_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("endTime", DataTypes.LONG_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("userName", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("queryText", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("queryPlan", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("queryId", DataTypes.STRING_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("queryGraph", DataTypes.STRING_TYPE.getName(), - Multiplicity.OPTIONAL, false, null), - }; - - HierarchicalTypeDefinition<ClassType> definition = new HierarchicalTypeDefinition<>( - ClassType.class, HiveDataTypes.HIVE_PROCESS.getName(), - ImmutableList.of(MetadataServiceClient.PROCESS_SUPER_TYPE), attributeDefinitions); - classTypeDefinitions.put(HiveDataTypes.HIVE_PROCESS.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_PROCESS.getName()); - } - - public String getModelAsJson() throws MetadataException { - createDataModel(); - return getDataModelAsJSON(); - } - - public static void main(String[] args) throws Exception { - HiveDataModelGenerator hiveDataModelGenerator = new HiveDataModelGenerator(); - System.out.println("hiveDataModelAsJSON = " + hiveDataModelGenerator.getModelAsJson()); - - TypesDef typesDef = hiveDataModelGenerator.getTypesDef(); - for (EnumTypeDefinition enumType : typesDef.enumTypesAsJavaList()) { - System.out.println(String.format("%s(%s) - %s", enumType.name, EnumType.class.getSimpleName(), - Arrays.toString(enumType.enumValues))); - } - for (StructTypeDefinition structType : typesDef.structTypesAsJavaList()) { - System.out.println(String.format("%s(%s) - %s", structType.typeName, StructType.class.getSimpleName(), - Arrays.toString(structType.attributeDefinitions))); - } - for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) { - System.out.println(String.format("%s(%s) - %s", classType.typeName, ClassType.class.getSimpleName(), - Arrays.toString(classType.attributeDefinitions))); - } - for (HierarchicalTypeDefinition<TraitType> traitType : typesDef.traitTypesAsJavaList()) { - System.out.println(String.format("%s(%s) - %s", traitType.typeName, TraitType.class.getSimpleName(), - Arrays.toString(traitType.attributeDefinitions))); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/model/HiveDataTypes.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/model/HiveDataTypes.java b/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/model/HiveDataTypes.java deleted file mode 100755 index 39c7d7d..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/hadoop/metadata/hive/model/HiveDataTypes.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.hive.model; - -/** - * Hive Data Types for model and bridge. - */ -public enum HiveDataTypes { - - // Enums - HIVE_OBJECT_TYPE, - HIVE_PRINCIPAL_TYPE, - HIVE_RESOURCE_TYPE, - - // Structs - HIVE_SERDE, - HIVE_ORDER, - HIVE_RESOURCEURI, - - // Classes - HIVE_DB, - HIVE_STORAGEDESC, - HIVE_TABLE, - HIVE_COLUMN, - HIVE_PARTITION, - HIVE_INDEX, - HIVE_ROLE, - HIVE_TYPE, - HIVE_PROCESS, - // HIVE_VIEW, - ; - - public String getName() { - return name().toLowerCase(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/BaseSSLAndKerberosTest.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/BaseSSLAndKerberosTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/BaseSSLAndKerberosTest.java new file mode 100644 index 0000000..084e831 --- /dev/null +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/BaseSSLAndKerberosTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.hive.hook; + +import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.security.BaseSecurityTest; +import org.apache.atlas.web.service.SecureEmbeddedServer; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.alias.CredentialProvider; +import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.mortbay.jetty.Server; + +import java.io.File; +import java.io.IOException; + +import static org.apache.atlas.security.SecurityProperties.KEYSTORE_PASSWORD_KEY; +import static org.apache.atlas.security.SecurityProperties.SERVER_CERT_PASSWORD_KEY; +import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_PASSWORD_KEY; + +/** + * + */ +public class BaseSSLAndKerberosTest extends BaseSecurityTest { + public static final String TESTUSER = "testuser"; + public static final String TESTPASS = "testpass"; + protected static final String DGI_URL = "https://localhost:21443/"; + protected Path jksPath; + protected String providerUrl; + protected File httpKeytabFile; + private File userKeytabFile; + + class TestSecureEmbeddedServer extends SecureEmbeddedServer { + + public TestSecureEmbeddedServer(int port, String path) throws IOException { + super(port, path); + } + + public Server getServer() { + return server; + } + + @Override + public PropertiesConfiguration getConfiguration() { + return super.getConfiguration(); + } + } + + protected void setupCredentials() throws Exception { + Configuration conf = new Configuration(false); + + File file = new File(jksPath.toUri().getPath()); + file.delete(); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, providerUrl); + + CredentialProvider provider = + CredentialProviderFactory.getProviders(conf).get(0); + + // create new aliases + try { + + char[] storepass = {'k', 'e', 'y', 'p', 'a', 's', 's'}; + provider.createCredentialEntry( + KEYSTORE_PASSWORD_KEY, storepass); + + char[] trustpass = {'k', 'e', 'y', 'p', 'a', 's', 's'}; + provider.createCredentialEntry( + TRUSTSTORE_PASSWORD_KEY, trustpass); + + char[] trustpass2 = {'k', 'e', 'y', 'p', 'a', 's', 's'}; + provider.createCredentialEntry( + "ssl.client.truststore.password", trustpass2); + + char[] certpass = {'k', 'e', 'y', 'p', 'a', 's', 's'}; + provider.createCredentialEntry( + SERVER_CERT_PASSWORD_KEY, certpass); + + // write out so that it can be found in checks + provider.flush(); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + public void setupKDCAndPrincipals() throws Exception { + // set up the KDC + File kdcWorkDir = startKDC(); + + userKeytabFile = createKeytab(kdc, kdcWorkDir, "dgi", "dgi.keytab"); + httpKeytabFile = createKeytab(kdc, kdcWorkDir, "HTTP", "spnego.service.keytab"); + + // create a test user principal + kdc.createPrincipal(TESTUSER, TESTPASS); + + StringBuilder jaas = new StringBuilder(1024); + jaas.append("TestUser {\n" + + " com.sun.security.auth.module.Krb5LoginModule required\nuseTicketCache=true;\n" + + "};\n"); + jaas.append(createJAASEntry("Client", "dgi", userKeytabFile)); + jaas.append(createJAASEntry("Server", "HTTP", httpKeytabFile)); + + File jaasFile = new File(kdcWorkDir, "jaas.txt"); + FileUtils.write(jaasFile, jaas.toString()); + bindJVMtoJAASFile(jaasFile); + } + + protected String getWarPath() { + return String.format("/../../webapp/target/atlas-webapp-%s", + System.getProperty("project.version", "0.1-incubating-SNAPSHOT")); + } + + protected HiveConf getHiveConf() { + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, ""); + hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, HiveHook.class.getName()); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, + System.getProperty("user.dir") + "/target/atlas"); + hiveConf.set(HiveMetaStoreBridge.DGI_URL_PROPERTY, DGI_URL); + hiveConf.set("javax.jdo.option.ConnectionURL", "jdbc:derby:./target/metastore_db;create=true"); + hiveConf.set("hive.hook.dgi.synchronous", "true"); + return hiveConf; + } +}
