Repository: incubator-atlas Updated Branches: refs/heads/master 70d549882 -> 086b4a3ee
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/sqoop-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml new file mode 100644 index 0000000..53949ef --- /dev/null +++ b/addons/sqoop-bridge/pom.xml @@ -0,0 +1,357 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>apache-atlas</artifactId> + <groupId>org.apache.atlas</groupId> + <version>0.7-incubating-SNAPSHOT</version> + <relativePath>../../</relativePath> + </parent> + <artifactId>sqoop-bridge</artifactId> + <description>Apache Atlas Sqoop Bridge Module</description> + <name>Apache Atlas Sqoop Bridge</name> + <packaging>jar</packaging> + + <properties> + <!-- maps to 1.4.7-SNAPSHOT version of apache sqoop --> + <sqoop.version>1.4.6.2.3.99.0-195</sqoop.version> + <hive.version>1.2.1</hive.version> + </properties> + + <dependencies> + <!-- Logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + <version>${hive.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty.aggregate</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + + </dependency> + + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>sqoop</artifactId> + <version>${sqoop.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-typesystem</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-notification</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>hive-bridge</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + <version>${hive.version}</version> + </dependency> + + <!-- to bring up atlas server for integration tests --> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-webapp</artifactId> + <type>war</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-hook</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + <overWrite>true</overWrite> + <outputDirectory>${project.build.directory}/dependency/hook/sqoop</outputDirectory> + </artifactItem> + </artifactItems> + </configuration> + </execution> + <execution> + <id>copy-hook-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/dependency/hook/sqoop</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>hive-bridge</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.json4s</groupId> + <artifactId>json4s-native_2.10</artifactId> + <version>${json.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.json4s</groupId> + <artifactId>json4s-core_2.10</artifactId> + <version>${json.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.json4s</groupId> + <artifactId>json4s-ast_2.10</artifactId> + <version>${json.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-client</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-typesystem</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-notification</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-common</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${scala.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.scala-lang</groupId> + <artifactId>scalap</artifactId> + <version>${scala.version}</version> + </artifactItem> + <artifactItem> + <groupId>com.google.inject.extensions</groupId> + <artifactId>guice-multibindings</artifactId> + <version>${guice.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-maven-plugin</artifactId> + <configuration> + <skip>${skipTests}</skip> + <!--only skip int tests --> + <httpConnector> + <port>31000</port> + <idleTimeout>60000</idleTimeout> + </httpConnector> + <war>../../webapp/target/atlas-webapp-${project.version}.war</war> + <daemon>true</daemon> + <webApp> + <contextPath>/</contextPath> + <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + </webApp> + <useTestScope>true</useTestScope> + <systemProperties> + <systemProperty> + <name>log4j.configuration</name> + <value>atlas-log4j.xml</value> + </systemProperty> + <systemProperty> + <name>atlas.log.dir</name> + <value>${project.build.directory}/logs</value> + </systemProperty> + <systemProperty> + <name>atlas.data</name> + <value>${project.build.directory}/data</value> + </systemProperty> + </systemProperties> + <stopKey>atlas-stop</stopKey> + <stopPort>31001</stopPort> + </configuration> + <executions> + <execution> + <id>start-jetty</id> + <phase>pre-integration-test</phase> + <goals> + <goal>deploy-war</goal> + </goals> + <configuration> + <daemon>true</daemon> + </configuration> + </execution> + <execution> + <id>stop-jetty</id> + <phase>post-integration-test</phase> + <goals> + <goal>stop</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-module-twiki</artifactId> + <version>1.3</version> + </dependency> + </dependencies> + <executions> + <execution> + <goals> + <goal>site</goal> + </goals> + <phase>prepare-package</phase> + </execution> + </executions> + <configuration> + <generateProjectInfo>false</generateProjectInfo> + <generateReports>false</generateReports> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java new file mode 100644 index 0000000..af68fcc --- /dev/null +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.sqoop.hook; + + +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.hive.model.HiveDataModelGenerator; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.sqoop.model.SqoopDataModelGenerator; +import org.apache.atlas.sqoop.model.SqoopDataTypes; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sqoop.SqoopJobDataPublisher; +import org.apache.sqoop.util.ImportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * AtlasHook sends lineage information to the AtlasSever. + */ +public class SqoopHook extends SqoopJobDataPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class); + private static final String DEFAULT_DGI_URL = "http://localhost:21000/"; + public static final String CONF_PREFIX = "atlas.hook.sqoop."; + public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + + public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name"; + public static final String DEFAULT_CLUSTER_NAME = "primary"; + public static final String ATLAS_REST_ADDRESS = "atlas.rest.address"; + + @Inject + private static NotificationInterface notifInterface; + + static { + org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml"); + } + + private synchronized void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception { + // Make sure hive model exists + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf, + UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser()); + hiveMetaStoreBridge.registerHiveDataModel(); + SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator(); + + //Register sqoop data model if its not already registered + try { + client.getType(SqoopDataTypes.SQOOP_PROCESS.getName()); + LOG.info("Sqoop data model is already registered!"); + } catch(AtlasServiceException ase) { + if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { + //Expected in case types do not exist + LOG.info("Registering Sqoop data model"); + client.createType(dataModelGenerator.getModelAsJson()); + } else { + throw ase; + } + } + } + + public Referenceable createHiveDatabaseInstance(String clusterName, String dbName) + throws Exception { + Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); + dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName); + dbRef.set(HiveDataModelGenerator.NAME, dbName); + dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); + return dbRef; + } + + public Referenceable createHiveTableInstance(String clusterName, Referenceable dbRef, + String tableName, String dbName) throws Exception { + Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); + tableRef.set(HiveDataModelGenerator.NAME, + HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); + tableRef.set(HiveDataModelGenerator.TABLE_NAME, tableName.toLowerCase()); + tableRef.set(HiveDataModelGenerator.DB, dbRef); + return tableRef; + } + + private Referenceable createDBStoreInstance(SqoopJobDataPublisher.Data data) + throws ImportException { + + Referenceable storeRef = new Referenceable(SqoopDataTypes.SQOOP_DBDATASTORE.getName()); + String table = data.getStoreTable(); + String query = data.getStoreQuery(); + if (StringUtils.isBlank(table) && StringUtils.isBlank(query)) { + throw new ImportException("Both table and query cannot be empty for DBStoreInstance"); + } + + String usage = table != null ? "TABLE" : "QUERY"; + String source = table != null ? table : query; + String name = getSqoopDBStoreName(data); + storeRef.set(SqoopDataModelGenerator.NAME, name); + storeRef.set(SqoopDataModelGenerator.DB_STORE_TYPE, data.getStoreType()); + storeRef.set(SqoopDataModelGenerator.DB_STORE_USAGE, usage); + storeRef.set(SqoopDataModelGenerator.STORE_URI, data.getUrl()); + storeRef.set(SqoopDataModelGenerator.SOURCE, source); + storeRef.set(SqoopDataModelGenerator.DESCRIPTION, ""); + storeRef.set(SqoopDataModelGenerator.OWNER, data.getUser()); + return storeRef; + } + + private Referenceable createSqoopProcessInstance(Referenceable dbStoreRef, Referenceable hiveTableRef, + SqoopJobDataPublisher.Data data, String clusterName) { + Referenceable procRef = new Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName()); + procRef.set(SqoopDataModelGenerator.NAME, getSqoopProcessName(data, clusterName)); + procRef.set(SqoopDataModelGenerator.OPERATION, data.getOperation()); + procRef.set(SqoopDataModelGenerator.INPUTS, dbStoreRef); + procRef.set(SqoopDataModelGenerator.OUTPUTS, hiveTableRef); + procRef.set(SqoopDataModelGenerator.USER, data.getUser()); + procRef.set(SqoopDataModelGenerator.START_TIME, new Date(data.getStartTime())); + procRef.set(SqoopDataModelGenerator.END_TIME, new Date(data.getEndTime())); + + Map<String, String> sqoopOptionsMap = new HashMap<>(); + Properties options = data.getOptions(); + for (Object k : options.keySet()) { + sqoopOptionsMap.put((String)k, (String) options.get(k)); + } + procRef.set(SqoopDataModelGenerator.CMD_LINE_OPTS, sqoopOptionsMap); + + return procRef; + } + + static String getSqoopProcessName(Data data, String clusterName) { + StringBuilder name = new StringBuilder(String.format("sqoop import --connect %s", data.getUrl())); + if (StringUtils.isNotEmpty(data.getStoreTable())) { + name.append(" --table ").append(data.getStoreTable()); + } + if (StringUtils.isNotEmpty(data.getStoreQuery())) { + name.append(" --query ").append(data.getStoreQuery()); + } + name.append(String.format(" --hive-import --hive-database %s --hive-table %s --hive-cluster %s", + data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName)); + return name.toString(); + } + + static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data) { + StringBuilder name = new StringBuilder(String.format("%s --url %s", data.getStoreType(), data.getUrl())); + if (StringUtils.isNotEmpty(data.getStoreTable())) { + name.append(" --table ").append(data.getStoreTable()); + } + if (StringUtils.isNotEmpty(data.getStoreQuery())) { + name.append(" --query ").append(data.getStoreQuery()); + } + return name.toString(); + } + + @Override + public void publish(SqoopJobDataPublisher.Data data) throws Exception { + Injector injector = Guice.createInjector(new NotificationModule()); + notifInterface = injector.getInstance(NotificationInterface.class); + + Configuration atlasProperties = ApplicationProperties.get(); + AtlasClient atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_REST_ADDRESS, DEFAULT_DGI_URL), + UserGroupInformation.getCurrentUser(), UserGroupInformation.getCurrentUser().getShortUserName()); + org.apache.hadoop.conf.Configuration sqoopConf = new org.apache.hadoop.conf.Configuration(); + String clusterName = sqoopConf.get(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + registerDataModels(atlasClient, atlasProperties); + + Referenceable dbStoreRef = createDBStoreInstance(data); + Referenceable dbRef = createHiveDatabaseInstance(clusterName, data.getHiveDB()); + Referenceable hiveTableRef = createHiveTableInstance(clusterName, dbRef, + data.getHiveTable(), data.getHiveDB()); + Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName); + + notifyEntity(atlasProperties, dbStoreRef, dbRef, hiveTableRef, procRef); + } + + /** + * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities. + * De-duping of entities is done on server side depending on the unique attribute on the + * @param entities - Entity references to publish. + */ + private void notifyEntity(Configuration atlasProperties, Referenceable... entities) { + int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3); + + int numRetries = 0; + while (true) { + try { + notifInterface.send(NotificationInterface.NotificationType.HOOK, + new HookNotification.EntityCreateRequest(entities)); + return; + } catch(Exception e) { + numRetries++; + if(numRetries < maxRetries) { + LOG.debug("Failed to notify atlas for entity {}. Retrying", entities, e); + } else { + LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", entities, + maxRetries, e); + break; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataModelGenerator.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataModelGenerator.java new file mode 100644 index 0000000..342c07f --- /dev/null +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataModelGenerator.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.sqoop.model; + +import com.google.common.collect.ImmutableList; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasException; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.json.TypesSerialization; +import org.apache.atlas.typesystem.types.AttributeDefinition; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.EnumType; +import org.apache.atlas.typesystem.types.EnumTypeDefinition; +import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.StructTypeDefinition; +import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.utils.TypesUtil; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Utility that generates Sqoop data model for both metastore entities and DDL/DML queries. + */ +public class SqoopDataModelGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(SqoopDataModelGenerator.class); + + private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions; + private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap; + private final Map<String, StructTypeDefinition> structTypeDefinitionMap; + private static final DataTypes.MapType STRING_MAP_TYPE = + new DataTypes.MapType(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE); + + public static final String NAME = "name"; + public static final String OWNER = "ownerName"; + public static final String USER = "userName"; + public static final String DB_STORE_TYPE = "dbStoreType"; + public static final String DB_STORE_USAGE = "storeUse"; + public static final String SOURCE = "source"; + public static final String DESCRIPTION = "description"; + public static final String STORE_URI = "storeUri"; + public static final String OPERATION = "operation"; + public static final String START_TIME = "startTime"; + public static final String END_TIME = "endTime"; + public static final String CMD_LINE_OPTS = "commandlineOpts"; + // multiple inputs and outputs for process + public static final String INPUTS = "inputs"; + public static final String OUTPUTS = "outputs"; + + public SqoopDataModelGenerator() { + classTypeDefinitions = new HashMap<>(); + enumTypeDefinitionMap = new HashMap<>(); + structTypeDefinitionMap = new HashMap<>(); + } + + public void createDataModel() throws AtlasException { + LOG.info("Generating the Sqoop Data Model...."); + + // enums + + // structs + + // classes + createSqoopDbStoreClass(); + + // DDL/DML Process + createSqoopProcessClass(); + } + + public TypesDef getTypesDef() { + return TypesUtil.getTypesDef(getEnumTypeDefinitions(), getStructTypeDefinitions(), getTraitTypeDefinitions(), + getClassTypeDefinitions()); + } + + public String getDataModelAsJSON() { + return TypesSerialization.toJson(getTypesDef()); + } + + public ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() { + return ImmutableList.copyOf(enumTypeDefinitionMap.values()); + } + + public ImmutableList<StructTypeDefinition> getStructTypeDefinitions() { + return ImmutableList.copyOf(structTypeDefinitionMap.values()); + } + + public ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() { + return ImmutableList.copyOf(classTypeDefinitions.values()); + } + + public ImmutableList<HierarchicalTypeDefinition<TraitType>> getTraitTypeDefinitions() { + return ImmutableList.of(); + } + + private void createSqoopDbStoreClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(DB_STORE_TYPE, + DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), + new AttributeDefinition(DB_STORE_USAGE, + DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), + new AttributeDefinition(STORE_URI, + DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), + new AttributeDefinition(SOURCE, + DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), + new AttributeDefinition(OWNER, + DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),}; + + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, SqoopDataTypes.SQOOP_DBDATASTORE.getName(), + ImmutableList.of(AtlasClient.DATA_SET_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(SqoopDataTypes.SQOOP_DBDATASTORE.getName(), definition); + LOG.debug("Created definition for " + SqoopDataTypes.SQOOP_DBDATASTORE.getName()); + } + + + private void createSqoopProcessClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(OPERATION, + DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), + new AttributeDefinition(CMD_LINE_OPTS, STRING_MAP_TYPE.getName(), Multiplicity.REQUIRED, false, null), + new AttributeDefinition(START_TIME, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, null), + new AttributeDefinition(END_TIME, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, null), + new AttributeDefinition(USER, + DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), + }; + + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, SqoopDataTypes.SQOOP_PROCESS.getName(), + ImmutableList.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(SqoopDataTypes.SQOOP_PROCESS.getName(), definition); + LOG.debug("Created definition for " + SqoopDataTypes.SQOOP_PROCESS.getName()); + } + + public String getModelAsJson() throws AtlasException { + createDataModel(); + return getDataModelAsJSON(); + } + + public static void main(String[] args) throws Exception { + SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator(); + System.out.println("sqoopDataModelAsJSON = " + dataModelGenerator.getModelAsJson()); + + TypesDef typesDef = dataModelGenerator.getTypesDef(); + for (EnumTypeDefinition enumType : typesDef.enumTypesAsJavaList()) { + System.out.println(String.format("%s(%s) - values %s", enumType.name, EnumType.class.getSimpleName(), + Arrays.toString(enumType.enumValues))); + } + + for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) { + System.out.println( + String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName, + ClassType.class.getSimpleName(), StringUtils.join(classType.superTypes, ","), + Arrays.toString(classType.attributeDefinitions))); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataTypes.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataTypes.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataTypes.java new file mode 100644 index 0000000..e71220a --- /dev/null +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataTypes.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.sqoop.model; + +/** + * Hive Data Types for model and bridge. + */ +public enum SqoopDataTypes { + + // Classes + SQOOP_DBDATASTORE, + SQOOP_PROCESS, + ; + + public String getName() { + return name().toLowerCase(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java new file mode 100644 index 0000000..5214223 --- /dev/null +++ b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.sqoop.hook; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.sqoop.model.SqoopDataTypes; +import org.apache.sqoop.SqoopJobDataPublisher; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Properties; + +public class SqoopHookIT { + public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(SqoopHookIT.class); + private static final String CLUSTER_NAME = "primary"; + public static final String DEFAULT_DB = "default"; + private static final int MAX_WAIT_TIME = 2000; + private AtlasClient dgiCLient; + + @BeforeClass + public void setUp() throws Exception { + //Set-up sqoop session + dgiCLient = new AtlasClient(ApplicationProperties.get().getString("atlas.rest.address")); + } + + @Test + public void testSqoopImport() throws Exception { + SqoopJobDataPublisher.Data d = new SqoopJobDataPublisher.Data("import", "jdbc:mysql:///localhost/db", + "mysqluser", "mysql", "myTable", null, "default", "hiveTable", new Properties(), + System.currentTimeMillis() - 100, System.currentTimeMillis()); + SqoopHook hook = new SqoopHook(); + hook.publish(d); + Thread.sleep(1000); + String storeName = SqoopHook.getSqoopDBStoreName(d); + assertDBStoreIsRegistered(storeName); + String name = SqoopHook.getSqoopProcessName(d, CLUSTER_NAME); + assertSqoopProcessIsRegistered(name); + assertHiveTableIsRegistered(DEFAULT_DB, "hiveTable"); + } + + private String assertDBStoreIsRegistered(String storeName) throws Exception { + LOG.debug("Searching for db store {}", storeName); + String query = String.format( + "%s as t where name = '%s'" + " select t", + SqoopDataTypes.SQOOP_DBDATASTORE.getName(), storeName); + return assertEntityIsRegistered(query); + } + + private String assertHiveTableIsRegistered(String dbName, String tableName) throws Exception { + LOG.debug("Searching for table {}.{}", dbName, tableName); + String query = String.format( + "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t", + HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); + return assertEntityIsRegistered(query); + } + + private String assertSqoopProcessIsRegistered(String processName) throws Exception { + LOG.debug("Searching for sqoop process {}", processName); + String query = String.format( + "%s as t where name = '%s' select t", + SqoopDataTypes.SQOOP_PROCESS.getName(), processName); + return assertEntityIsRegistered(query); + } + + private String assertEntityIsRegistered(final String query) throws Exception { + waitFor(MAX_WAIT_TIME, new Predicate() { + @Override + public boolean evaluate() throws Exception { + JSONArray results = dgiCLient.search(query); + return results.length() > 0; + } + }); + + JSONArray results = dgiCLient.search(query); + JSONObject row = results.getJSONObject(0).getJSONObject("t"); + + return row.getString("id"); + } + + protected void waitFor(int timeout, Predicate predicate) throws Exception { + long mustEnd = System.currentTimeMillis() + timeout; + + boolean eval; + while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) { + LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis()); + Thread.sleep(1000); + } + if (!eval) { + throw new Exception("Waiting timed out after " + timeout + " msec"); + } + } + + public interface Predicate { + /** + * Perform a predicate evaluation. + * + * @return the boolean result of the evaluation. + * @throws Exception thrown if the predicate evaluation could not evaluate. + */ + boolean evaluate() throws Exception; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/sqoop-bridge/src/test/resources/hive-site.xml ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/test/resources/hive-site.xml b/addons/sqoop-bridge/src/test/resources/hive-site.xml new file mode 100644 index 0000000..b106903 --- /dev/null +++ b/addons/sqoop-bridge/src/test/resources/hive-site.xml @@ -0,0 +1,53 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + <property> + <name>hive.exec.post.hooks</name> + <value>org.apache.atlas.hive.hook.HiveHook</value> + </property> + + <property> + <name>hive.support.concurrency</name> + <value>false</value> + </property> + + <property> + <name>hive.metastore.warehouse.dir</name> + <value>${user.dir}/target/metastore</value> + </property> + + <property> + <name>javax.jdo.option.ConnectionURL</name> + <value>jdbc:derby:${user.dir}/target/metastore_db;create=true</value> + </property> + + <property> + <name>atlas.hook.hive.synchronous</name> + <value>true</value> + </property> + + <property> + <name>atlas.cluster.name</name> + <value>test</value> + </property> + + <property> + <name>fs.pfile.impl</name> + <value>org.apache.hadoop.fs.ProxyLocalFileSystem</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/sqoop-bridge/src/test/resources/sqoop-site.xml ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/test/resources/sqoop-site.xml b/addons/sqoop-bridge/src/test/resources/sqoop-site.xml new file mode 100644 index 0000000..a63e7e4 --- /dev/null +++ b/addons/sqoop-bridge/src/test/resources/sqoop-site.xml @@ -0,0 +1,190 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<!-- Put Sqoop-specific properties in this file. --> + +<configuration> + + <!-- + Set the value of this property to explicitly enable third-party + ManagerFactory plugins. + + If this is not used, you can alternately specify a set of ManagerFactories + in the $SQOOP_CONF_DIR/managers.d/ subdirectory. Each file should contain + one or more lines like: + manager.class.name[=/path/to/containing.jar] + + Files will be consulted in lexicographical order only if this property + is unset. + --> + <!-- + <property> + <name>sqoop.connection.factories</name> + <value>com.cloudera.sqoop.manager.DefaultManagerFactory</value> + <description>A comma-delimited list of ManagerFactory implementations + which are consulted, in order, to instantiate ConnManager instances + used to drive connections to databases. + </description> + </property> + --> + + <!-- + Set the value of this property to enable third-party tools. + + If this is not used, you can alternately specify a set of ToolPlugins + in the $SQOOP_CONF_DIR/tools.d/ subdirectory. Each file should contain + one or more lines like: + plugin.class.name[=/path/to/containing.jar] + + Files will be consulted in lexicographical order only if this property + is unset. + --> + <!-- + <property> + <name>sqoop.tool.plugins</name> + <value></value> + <description>A comma-delimited list of ToolPlugin implementations + which are consulted, in order, to register SqoopTool instances which + allow third-party tools to be used. + </description> + </property> + --> + + <!-- + By default, the Sqoop metastore will auto-connect to a local embedded + database stored in ~/.sqoop/. To disable metastore auto-connect, uncomment + this next property. + --> + <!-- + <property> + <name>sqoop.metastore.client.enable.autoconnect</name> + <value>false</value> + <description>If true, Sqoop will connect to a local metastore + for job management when no other metastore arguments are + provided. + </description> + </property> + --> + + <!-- + The auto-connect metastore is stored in ~/.sqoop/. Uncomment + these next arguments to control the auto-connect process with + greater precision. + --> + <!-- + <property> + <name>sqoop.metastore.client.autoconnect.url</name> + <value>jdbc:hsqldb:file:/tmp/sqoop-meta/meta.db;shutdown=true</value> + <description>The connect string to use when connecting to a + job-management metastore. If unspecified, uses ~/.sqoop/. + You can specify a different path here. + </description> + </property> + <property> + <name>sqoop.metastore.client.autoconnect.username</name> + <value>SA</value> + <description>The username to bind to the metastore. + </description> + </property> + <property> + <name>sqoop.metastore.client.autoconnect.password</name> + <value></value> + <description>The password to bind to the metastore. + </description> + </property> + --> + + <!-- + For security reasons, by default your database password will not be stored in + the Sqoop metastore. When executing a saved job, you will need to + reenter the database password. Uncomment this setting to enable saved + password storage. (INSECURE!) + --> + <!-- + <property> + <name>sqoop.metastore.client.record.password</name> + <value>true</value> + <description>If true, allow saved passwords in the metastore. + </description> + </property> + --> + + <!-- + Enabling this option will instruct Sqoop to put all options that + were used in the invocation into created mapreduce job(s). This + become handy when one needs to investigate what exact options were + used in the Sqoop invocation. + --> + <!-- + <property> + <name>sqoop.jobbase.serialize.sqoopoptions</name> + <value>true</value> + <description>If true, then all options will be serialized into job.xml + </description> + </property> + --> + + <!-- + SERVER CONFIGURATION: If you plan to run a Sqoop metastore on this machine, + you should uncomment and set these parameters appropriately. + + You should then configure clients with: + sqoop.metastore.client.autoconnect.url = + jdbc:hsqldb:hsql://<server-name>:<port>/sqoop + --> + <!-- + <property> + <name>sqoop.metastore.server.location</name> + <value>/tmp/sqoop-metastore/shared.db</value> + <description>Path to the shared metastore database files. + If this is not set, it will be placed in ~/.sqoop/. + </description> + </property> + + <property> + <name>sqoop.metastore.server.port</name> + <value>16000</value> + <description>Port that this metastore should listen on. + </description> + </property> + --> + <!-- + ATLAS SERVER ADDRESS +--> + <property> + <name>atlas.rest.address</name> + <value>http://localhost:21000/</value> + </property> + <!-- + SQOOP JOB DATA PUBLISHING CLASS. Currently only one publishing class is supported +--> + <property> + <name>sqoop.job.data.publish.class</name> + <value>org.apache.atlas.sqoop.hook.SqoopHook</value> + </property> + <!-- +ATLAS SERVER ADDRESS +--> + <property> + <name>atlas.cluster.name</name> + <value>primary</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/distro/src/bin/atlas_start.py ---------------------------------------------------------------------- diff --git a/distro/src/bin/atlas_start.py b/distro/src/bin/atlas_start.py index cd9669b..b992b16 100755 --- a/distro/src/bin/atlas_start.py +++ b/distro/src/bin/atlas_start.py @@ -24,7 +24,7 @@ import atlas_config as mc ATLAS_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=application.log" ATLAS_COMMAND_OPTS="-Datlas.home=%s" ATLAS_CONFIG_OPTS="-Datlas.conf=%s" -DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml" +DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml -Djava.net.preferIPv4Stack=true" CONF_FILE="atlas-application.properties" HBASE_STORAGE_CONF_ENTRY="atlas.graph.storage.backend\s*=\s*hbase" http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/distro/src/main/assemblies/standalone-package.xml ---------------------------------------------------------------------- diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml index 56fe736..b80a0ad 100755 --- a/distro/src/main/assemblies/standalone-package.xml +++ b/distro/src/main/assemblies/standalone-package.xml @@ -98,6 +98,18 @@ <directory>../addons/hive-bridge/target/dependency/hook</directory> <outputDirectory>hook</outputDirectory> </fileSet> + + <!-- addons/falcon --> + <fileSet> + <directory>../addons/falcon-bridge/target/dependency/hook</directory> + <outputDirectory>hook</outputDirectory> + </fileSet> + + <!-- addons/sqoop --> + <fileSet> + <directory>../addons/sqoop-bridge/target/dependency/hook</directory> + <outputDirectory>hook</outputDirectory> + </fileSet> </fileSets> <files> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/distro/src/test/python/scripts/TestMetadata.py ---------------------------------------------------------------------- diff --git a/distro/src/test/python/scripts/TestMetadata.py b/distro/src/test/python/scripts/TestMetadata.py index 441126e..dfbf59d 100644 --- a/distro/src/test/python/scripts/TestMetadata.py +++ b/distro/src/test/python/scripts/TestMetadata.py @@ -57,14 +57,14 @@ class TestMetadata(unittest.TestCase): 'org.apache.atlas.Atlas', ['-app', 'atlas_home\\server\\webapp\\atlas'], 'atlas_home\\conf;atlas_home\\server\\webapp\\atlas\\WEB-INF\\classes;atlas_home\\server\\webapp\\atlas\\WEB-INF\\lib\\atlas-titan-${project.version}.jar;atlas_home\\server\\webapp\\atlas\\WEB-INF\\lib\\*;atlas_home\\libext\\*;atlas_home\\hbase\\conf', - ['-Datlas.log.dir=atlas_home\\logs', '-Datlas.log.file=application.log', '-Datlas.home=atlas_home', '-Datlas.conf=atlas_home\\conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'atlas_home\\logs') + ['-Datlas.log.dir=atlas_home\\logs', '-Datlas.log.file=application.log', '-Datlas.home=atlas_home', '-Datlas.conf=atlas_home\\conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml', '-Djava.net.preferIPv4Stack=true'], 'atlas_home\\logs') else: java_mock.assert_called_with( 'org.apache.atlas.Atlas', ['-app', 'atlas_home/server/webapp/atlas'], 'atlas_home/conf:atlas_home/server/webapp/atlas/WEB-INF/classes:atlas_home/server/webapp/atlas/WEB-INF/lib/atlas-titan-${project.version}.jar:atlas_home/server/webapp/atlas/WEB-INF/lib/*:atlas_home/libext/*:atlas_home/hbase/conf', - ['-Datlas.log.dir=atlas_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=atlas_home', '-Datlas.conf=atlas_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'atlas_home/logs') + ['-Datlas.log.dir=atlas_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=atlas_home', '-Datlas.conf=atlas_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml', '-Djava.net.preferIPv4Stack=true'], 'atlas_home/logs') pass http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/docs/src/site/twiki/Architecture.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/Architecture.twiki b/docs/src/site/twiki/Architecture.twiki index 6896e85..8966cbe 100755 --- a/docs/src/site/twiki/Architecture.twiki +++ b/docs/src/site/twiki/Architecture.twiki @@ -23,7 +23,8 @@ Atlas exposes notification interface and can be used for reliable entity registr Available bridges are: * [[Bridge-Hive][Hive Bridge]] - + * [[Bridge-Sqoop][Sqoop Bridge]] + * [[Bridge-Falcon][Falcon Bridge]] ---++ Notification Notification is used for reliable entity registration from hooks and for entity/type change notifications. Atlas, by default, provides Kafka integration, but its possible to provide other implementations as well. Atlas service starts embedded Kafka server by default. http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/docs/src/site/twiki/Bridge-Falcon.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/Bridge-Falcon.twiki b/docs/src/site/twiki/Bridge-Falcon.twiki new file mode 100644 index 0000000..d9a452a --- /dev/null +++ b/docs/src/site/twiki/Bridge-Falcon.twiki @@ -0,0 +1,34 @@ +---+ Falcon Atlas Bridge + +---++ Falcon Model +The default falcon modelling is available in org.apache.atlas.falcon.model.FalconDataModelGenerator. It defines the following types: +<verbatim> +falcon_process(ClassType) - super types [Process] - attributes [timestamp, owned-by, tags] +</verbatim> + +One falcon_process entity is created for every cluster that the falcon process is defined for. + +The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying/lineage as well. The unique attributes are: + * falcon_process - attribute name - <process name>@<cluster name> + +---++ Falcon Hook +Falcon supports listeners on falcon entity submission. This is used to add entities in Atlas using the model defined in org.apache.atlas.falcon.model.FalconDataModelGenerator. +The hook submits the request to a thread pool executor to avoid blocking the command execution. The thread submits the entities as message to the notification server and atlas server reads these messages and registers the entities. + * Add 'org.apache.falcon.atlas.service.AtlasService' to application.services in <falcon-conf>/startup.properties + * Link falcon hook jars in falcon classpath - 'ln -s <atlas-home>/hook/falcon/* <falcon-home>/server/webapp/falcon/WEB-INF/lib/' + * Copy <atlas-conf>/client.properties and <atlas-conf>/atlas-application.properties to the falcon conf directory. + +The following properties in <atlas-conf>/client.properties control the thread pool and notification details: + * atlas.hook.falcon.synchronous - boolean, true to run the hook synchronously. default false + * atlas.hook.falcon.numRetries - number of retries for notification failure. default 3 + * atlas.hook.falcon.minThreads - core number of threads. default 5 + * atlas.hook.falcon.maxThreads - maximum number of threads. default 5 + * atlas.hook.falcon.keepAliveTime - keep alive time in msecs. default 10 + * atlas.hook.falcon.queueSize - queue size for the threadpool. default 10000 + +Refer [[Configuration][Configuration]] for notification related configurations + + +---++ Limitations + * Only the process entity creation is currently handled. This model will be expanded to include all Falcon metadata + * In falcon cluster entity, cluster name used should be uniform across components like hive, falcon, sqoop etc. If used with ambari, ambari cluster name should be used for cluster entity http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/docs/src/site/twiki/Bridge-Sqoop.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/Bridge-Sqoop.twiki b/docs/src/site/twiki/Bridge-Sqoop.twiki new file mode 100644 index 0000000..4a828f2 --- /dev/null +++ b/docs/src/site/twiki/Bridge-Sqoop.twiki @@ -0,0 +1,37 @@ +---+ Sqoop Atlas Bridge + +---++ Sqoop Model +The default Sqoop modelling is available in org.apache.atlas.sqoop.model.SqoopDataModelGenerator. It defines the following types: +<verbatim> +sqoop_operation_type(EnumType) - values [IMPORT, EXPORT, EVAL] +sqoop_dbstore_usage(EnumType) - values [TABLE, QUERY, PROCEDURE, OTHER] +sqoop_process(ClassType) - super types [Process] - attributes [name, operation, dbStore, hiveTable, commandlineOpts, startTime, endTime, userName] +sqoop_dbdatastore(ClassType) - super types [DataSet] - attributes [name, dbStoreType, storeUse, storeUri, source, description, ownerName] +</verbatim> + +The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying as well: +sqoop_process - attribute name - sqoop-dbStoreType-storeUri-endTime +sqoop_dbdatastore - attribute name - dbStoreType-connectorUrl-source + +---++ Sqoop Hook +Sqoop added a SqoopJobDataPublisher that publishes data to Atlas after completion of import Job. Today, only hiveImport is supported in sqoopHook. +This is used to add entities in Atlas using the model defined in org.apache.atlas.sqoop.model.SqoopDataModelGenerator. +Follow these instructions in your sqoop set-up to add sqoop hook for Atlas in <sqoop-conf>/sqoop-site.xml: + + * Sqoop Job publisher class. Currently only one publishing class is supported + <property> + <name>sqoop.job.data.publish.class</name> + <value>org.apache.atlas.sqoop.hook.SqoopHook</value> + </property> + * Atlas cluster name + <property> + <name>atlas.cluster.name</name> + <value><clustername></value> + </property> + * Copy <atlas-conf>/atlas-application.properties and <atlas-conf>/client.properties to to the sqoop conf directory <sqoop-conf>/ + * Link <atlas-home>/hook/sqoop/*.jar in sqoop lib + +Refer [[Configuration][Configuration]] for notification related configurations + +---++ Limitations + * Only the following sqoop operations are captured by sqoop hook currently - hiveImport http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/docs/src/site/twiki/index.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/index.twiki b/docs/src/site/twiki/index.twiki index a921b11..6f7333c 100755 --- a/docs/src/site/twiki/index.twiki +++ b/docs/src/site/twiki/index.twiki @@ -47,6 +47,8 @@ allows integration with the whole enterprise data ecosystem. * [[Notification-Entity][Entity Notification]] * Bridges * [[Bridge-Hive][Hive Bridge]] + * [[Bridge-Sqoop][Sqoop Bridge]] + * [[Bridge-Falcon][Falcon Bridge]] * [[HighAvailability][Fault Tolerance And High Availability Options]] http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java index 568f58b..33e0fe5 100644 --- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java @@ -102,8 +102,11 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN private EntityCreateRequest() { } public EntityCreateRequest(Referenceable... entities) { - super(HookNotificationType.ENTITY_CREATE); - this.entities = Arrays.asList(entities); + this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities)); + } + + public EntityCreateRequest(List<Referenceable> entities) { + this(HookNotificationType.ENTITY_CREATE, entities); } protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 59d59c9..bd1f918 100755 --- a/pom.xml +++ b/pom.xml @@ -429,6 +429,8 @@ <module>webapp</module> <module>docs</module> <module>addons/hive-bridge</module> + <module>addons/falcon-bridge</module> + <module>addons/sqoop-bridge</module> <module>distro</module> </modules> @@ -976,6 +978,24 @@ <dependency> <groupId>org.apache.atlas</groupId> + <artifactId>hive-bridge</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>falcon-bridge</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>sqoop-bridge</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> <artifactId>atlas-dashboard</artifactId> <version>${project.version}</version> <type>war</type> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index afa428b..f3ec576 100644 --- a/release-log.txt +++ b/release-log.txt @@ -3,6 +3,7 @@ Apache Atlas Release Notes --trunk - unreleased INCOMPATIBLE CHANGES: +ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: ATLAS-392 Rename application.properties to atlas-application.properties (rishabhbhardwaj via shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/typesystem/src/main/resources/atlas-application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties index 702c6f2..bb6e171 100644 --- a/typesystem/src/main/resources/atlas-application.properties +++ b/typesystem/src/main/resources/atlas-application.properties @@ -64,7 +64,7 @@ atlas.notification.embedded=true atlas.kafka.zookeeper.connect=localhost:19026 atlas.kafka.bootstrap.servers=localhost:19027 atlas.kafka.data=${sys:atlas.data}/kafka -atlas.kafka.zookeeper.session.timeout.ms=400 +atlas.kafka.zookeeper.session.timeout.ms=4000 atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.consumer.timeout.ms=100 atlas.kafka.auto.commit.interval.ms=100
