Repository: incubator-atlas
Updated Branches:
  refs/heads/master f47cea3f8 -> 0a44790e7


ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/0a44790e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/0a44790e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/0a44790e

Branch: refs/heads/master
Commit: 0a44790e7866e92b6591d442f140705a079fef30
Parents: f47cea3
Author: Hemanth Yamijala <[email protected]>
Authored: Tue Jun 14 14:42:58 2016 +0530
Committer: Hemanth Yamijala <[email protected]>
Committed: Tue Jun 14 14:42:58 2016 +0530

----------------------------------------------------------------------
 .../apache/atlas/utils/AuthenticationUtil.java  |  19 +-
 distro/src/bin/atlas_client_cmdline.py          |  10 +
 distro/src/bin/atlas_config.py                  |  14 +
 distro/src/bin/atlas_kafka_setup.py             |  37 +++
 distro/src/bin/atlas_kafka_setup_hook.py        |  37 +++
 distro/src/conf/atlas-application.properties    |   8 +-
 .../src/main/assemblies/standalone-package.xml  |  18 ++
 docs/src/site/twiki/Configuration.twiki         |  17 ++
 docs/src/site/twiki/InstallationSteps.twiki     |  10 +
 notification/pom.xml                            | 134 +++++++++
 .../apache/atlas/hook/AtlasTopicCreator.java    | 136 +++++++++
 .../atlas/hook/AtlasTopicCreatorTest.java       | 281 +++++++++++++++++++
 pom.xml                                         |   8 +
 release-log.txt                                 |   1 +
 14 files changed, 721 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java 
b/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java
index 3dbab17..bf1175f 100644
--- a/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java
+++ b/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java
@@ -38,20 +38,23 @@ public final class AuthenticationUtil {
     public static boolean isKerberosAuthenticationEnabled() {
         boolean isKerberosAuthenticationEnabled = false;
         try {
-            Configuration atlasConf = ApplicationProperties.get();
-
-            if 
("true".equalsIgnoreCase(atlasConf.getString("atlas.authentication.method.kerberos")))
 {
-                isKerberosAuthenticationEnabled = true;
-            } else {
-                isKerberosAuthenticationEnabled = false;
-            }
-
+            isKerberosAuthenticationEnabled = 
isKerberosAuthenticationEnabled(ApplicationProperties.get());
         } catch (AtlasException e) {
             LOG.error("Error while isKerberosAuthenticationEnabled ", e);
         }
         return isKerberosAuthenticationEnabled;
     }
 
+    public static boolean isKerberosAuthenticationEnabled(Configuration 
atlasConf) {
+        boolean isKerberosAuthenticationEnabled;
+        if 
("true".equalsIgnoreCase(atlasConf.getString("atlas.authentication.method.kerberos")))
 {
+            isKerberosAuthenticationEnabled = true;
+        } else {
+            isKerberosAuthenticationEnabled = false;
+        }
+        return isKerberosAuthenticationEnabled;
+    }
+
     public static String[] getBasicAuthenticationInput() {
         String username = null;
         String password = null;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_client_cmdline.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_client_cmdline.py 
b/distro/src/bin/atlas_client_cmdline.py
index f109ad3..f05a3c8 100644
--- a/distro/src/bin/atlas_client_cmdline.py
+++ b/distro/src/bin/atlas_client_cmdline.py
@@ -40,6 +40,16 @@ def get_atlas_classpath(confdir):
         atlas_classpath = mc.convertCygwinPath(atlas_classpath, True)
     return atlas_classpath
 
+def get_atlas_hook_classpath(confdir):
+    atlas_home = mc.atlasDir()
+    kafka_topic_setup_dir = mc.kafkaTopicSetupDir(atlas_home)
+    p = os.pathsep
+    atlas_hook_classpath = confdir + p \
+                            + os.path.join(kafka_topic_setup_dir, "*")
+    if mc.isCygwin():
+        atlas_hook_classpath = mc.convertCygwinPath(atlas_hook_classpath, True)
+    return atlas_hook_classpath
+
 def setup_jvm_opts_list(confdir, log_name):
     atlas_home = mc.atlasDir()
     mc.executeEnvSh(confdir)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_config.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_config.py b/distro/src/bin/atlas_config.py
index fab4046..c211823 100755
--- a/distro/src/bin/atlas_config.py
+++ b/distro/src/bin/atlas_config.py
@@ -64,6 +64,7 @@ 
HBASE_STORAGE_LOCAL_CONF_ENTRY="atlas.graph.storage.hostname\s*=\s*localhost"
 SOLR_INDEX_CONF_ENTRY="atlas.graph.index.search.backend\s*=\s*solr5"
 
SOLR_INDEX_LOCAL_CONF_ENTRY="atlas.graph.index.search.solr.zookeeper-url\s*=\s*localhost"
 SOLR_INDEX_ZK_URL="atlas.graph.index.search.solr.zookeeper-url"
+TOPICS_TO_CREATE="atlas.notification.topics"
 
 DEBUG = False
 
@@ -121,6 +122,9 @@ def webAppDir(dir):
     webapp = os.path.join(dir, WEBAPP)
     return os.environ.get(ATLAS_WEBAPP, webapp)
 
+def kafkaTopicSetupDir(homeDir):
+    return os.path.join(homeDir, "hook", "kafka-topic-setup")
+
 def expandWebApp(dir):
     webappDir = webAppDir(dir)
     webAppMetadataDir = os.path.join(webappDir, "atlas")
@@ -429,6 +433,16 @@ def get_solr_zk_url(confdir):
     confdir = os.path.join(confdir, CONF_FILE)
     return getConfig(confdir, SOLR_INDEX_ZK_URL)
 
+def get_topics_to_create(confdir):
+    confdir = os.path.join(confdir, CONF_FILE)
+    topic_list = getConfig(confdir, TOPICS_TO_CREATE)
+    if topic_list is not None:
+        topics = topic_list.split(",")
+    else:
+        topics = ["ATLAS_HOOK", "ATLAS_ENTITIES"]
+    return topics
+
+
 def run_solr(dir, action, zk_url = None, port = None, logdir = None, 
wait=True):
 
     solrScript = "solr"

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_kafka_setup.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_kafka_setup.py 
b/distro/src/bin/atlas_kafka_setup.py
new file mode 100644
index 0000000..146a7e5
--- /dev/null
+++ b/distro/src/bin/atlas_kafka_setup.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+import atlas_client_cmdline as cmdline
+import atlas_config as mc
+
+def main():
+    conf_dir = cmdline.setup_conf_dir()
+    jvm_opts_list = cmdline.setup_jvm_opts_list(conf_dir, 
'atlas_kafka_setup.log')
+    atlas_classpath = cmdline.get_atlas_classpath(conf_dir)
+    topics_array = mc.get_topics_to_create(conf_dir)
+    process = mc.java("org.apache.atlas.hook.AtlasTopicCreator", topics_array, 
atlas_classpath, jvm_opts_list)
+    return process.wait()
+
+if __name__ == '__main__':
+    try:
+        returncode = main()
+    except Exception as e:
+        print "Exception in setting up Kafka topics for Atlas: %s" % str(e)
+        returncode = -1
+
+    sys.exit(returncode)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_kafka_setup_hook.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_kafka_setup_hook.py 
b/distro/src/bin/atlas_kafka_setup_hook.py
new file mode 100644
index 0000000..6fdff74
--- /dev/null
+++ b/distro/src/bin/atlas_kafka_setup_hook.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+import atlas_client_cmdline as cmdline
+import atlas_config as mc
+
+def main():
+    conf_dir = cmdline.setup_conf_dir()
+    jvm_opts_list = cmdline.setup_jvm_opts_list(conf_dir, 
'atlas_kafka_setup_hook.log')
+    atlas_classpath = cmdline.get_atlas_hook_classpath(conf_dir)
+    topics_array = mc.get_topics_to_create(conf_dir)
+    process = mc.java("org.apache.atlas.hook.AtlasTopicCreator", topics_array, 
atlas_classpath, jvm_opts_list)
+    return process.wait()
+
+if __name__ == '__main__':
+    try:
+        returncode = main()
+    except Exception as e:
+        print "Exception in setting up Kafka topics for Atlas: %s" % str(e)
+        returncode = -1
+
+    sys.exit(returncode)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties 
b/distro/src/conf/atlas-application.properties
index 5bd0f74..cc0e4c1 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -56,12 +56,18 @@ atlas.kafka.data=${sys:atlas.home}/data/kafka
 atlas.kafka.zookeeper.connect=localhost:9026
 atlas.kafka.bootstrap.servers=localhost:9027
 atlas.kafka.zookeeper.session.timeout.ms=400
+atlas.kafka.zookeeper.connection.timeout.ms=200
 atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=1000
 atlas.kafka.auto.offset.reset=smallest
 atlas.kafka.hook.group.id=atlas
 atlas.kafka.auto.commit.enable=false
-
+atlas.notification.create.topics=true
+atlas.notification.replicas=1
+atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
+# Enable for Kerberized Kafka clusters
+#atlas.notification.kafka.service.principal=kafka/[email protected]
+#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
 
 #########  Hive Lineage Configs  #########
 ## Schema

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/distro/src/main/assemblies/standalone-package.xml 
b/distro/src/main/assemblies/standalone-package.xml
index 1c7b2c5..26c24ea 100755
--- a/distro/src/main/assemblies/standalone-package.xml
+++ b/distro/src/main/assemblies/standalone-package.xml
@@ -55,6 +55,18 @@
         </fileSet>
 
         <fileSet>
+            <directory>target/bin</directory>
+            <outputDirectory>hook-bin</outputDirectory>
+            <includes>
+                <include>atlas_client_cmdline.py</include>
+                <include>atlas_config.py</include>
+                <include>atlas_kafka_setup_hook.py</include>
+            </includes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
             <directory>target/hbase</directory>
             <outputDirectory>hbase</outputDirectory>
             <fileMode>0755</fileMode>
@@ -156,6 +168,12 @@
             <directory>../addons/storm-bridge/target/models</directory>
             <outputDirectory>models</outputDirectory>
         </fileSet>
+
+        <!-- for kafka topic setup -->
+        <fileSet>
+            <directory>../notification/target/dependency/hook</directory>
+            <outputDirectory>hook</outputDirectory>
+        </fileSet>
     </fileSets>
 
     <files>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki 
b/docs/src/site/twiki/Configuration.twiki
index 7150483..0e122fe 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -154,6 +154,23 @@ Note that Kafka group ids are specified for a specific 
topic.  The Kafka group i
 atlas.kafka.entities.group.id=<consumer id>
 </verbatim>
 
+These configuration parameters are useful for setting up Kafka topics via 
Atlas provided scripts, described in the
+[[InstallationSteps][Installation Steps]] page.
+
+<verbatim>
+# Whether to create the topics automatically, default is true.
+# Comma separated list of topics to be created, default is 
"ATLAS_HOOK,ATLAS_ENTITES"
+atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
+# Number of replicas for the Atlas topics, default is 1. Increase for higher 
resilience to Kafka failures.
+atlas.notification.replicas=1
+# Enable the below two properties if Kafka is running in Kerberized mode.
+# Set this to the service principal representing the Kafka service
+atlas.notification.kafka.service.principal=kafka/[email protected]
+# Set this to the location of the keytab file for Kafka
+#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
+
+</verbatim>
+
 
 ---++ Client Configs
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki 
b/docs/src/site/twiki/InstallationSteps.twiki
index 518c380..1eac288 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -262,6 +262,16 @@ Pre-requisites for running Solr in cloud mode
   * !SolrCloud has support for replication and sharding. It is highly 
recommended to use !SolrCloud with at least two Solr nodes running on different 
servers with replication enabled.
     If using !SolrCloud, then you also need !ZooKeeper installed and 
configured with 3 or 5 !ZooKeeper nodes
 
+*Configuring Kafka Topics*
+
+Atlas uses Kafka to ingest metadata from other components at runtime. This is 
described in the [[Architecture][Architecture page]]
+in more detail. Depending on the configuration of Kafka, sometimes you might 
need to setup the topics explicitly before
+using Atlas. To do so, Atlas provides a script =bin/atlas_kafka_setup.py= 
which can be run from the Atlas server. In some
+environments, the hooks might start getting used first before Atlas server 
itself is setup. In such cases, the topics
+can be run on the hosts where hooks are installed using a similar script 
=hook-bin/atlas_kafka_setup_hook.py=. Both these
+use configuration in =atlas-application.properties= for setting up the topics. 
Please refer to the [[Configuration][Configuration page]]
+for these details.
+
 ---++++ Setting up Atlas
 
 There are a few steps that setup dependencies of Atlas. One such example is 
setting up the Titan schema

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/notification/pom.xml
----------------------------------------------------------------------
diff --git a/notification/pom.xml b/notification/pom.xml
index b3738db..fc08115 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -90,5 +90,139 @@
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>com.101tec</groupId>
+            <artifactId>zkclient</artifactId>
+            <version>${zkclient.version}</version>
+        </dependency>
+
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-hook-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            
<outputDirectory>${project.build.directory}/dependency/hook/kafka-topic-setup</outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    
<artifactId>${project.artifactId}</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    <artifactId>atlas-common</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>commons-logging</groupId>
+                                    <artifactId>commons-logging</artifactId>
+                                    
<version>${commons-logging.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>commons-configuration</groupId>
+                                    
<artifactId>commons-configuration</artifactId>
+                                    <version>${commons-conf.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>commons-collections</groupId>
+                                    
<artifactId>commons-collections</artifactId>
+                                    
<version>${commons-collections.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>commons-lang</groupId>
+                                    <artifactId>commons-lang</artifactId>
+                                    <version>${commons-lang.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>com.google.guava</groupId>
+                                    <artifactId>guava</artifactId>
+                                    <version>${guava.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.hadoop</groupId>
+                                    <artifactId>hadoop-common</artifactId>
+                                    <version>${hadoop.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.hadoop</groupId>
+                                    <artifactId>hadoop-auth</artifactId>
+                                    <version>${hadoop.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.slf4j</groupId>
+                                    <artifactId>slf4j-api</artifactId>
+                                    <version>${slf4j.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.slf4j</groupId>
+                                    <artifactId>slf4j-log4j12</artifactId>
+                                    <version>${slf4j.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>log4j</groupId>
+                                    <artifactId>log4j</artifactId>
+                                    <version>1.2.17</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.kafka</groupId>
+                                    
<artifactId>kafka_${scala.binary.version}</artifactId>
+                                    <version>${kafka.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.kafka</groupId>
+                                    <artifactId>kafka-clients</artifactId>
+                                    <version>${kafka.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.scala-lang</groupId>
+                                    <artifactId>scala-compiler</artifactId>
+                                    <version>${scala.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.scala-lang</groupId>
+                                    <artifactId>scala-reflect</artifactId>
+                                    <version>${scala.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.scala-lang</groupId>
+                                    <artifactId>scala-library</artifactId>
+                                    <version>${scala.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.scala-lang</groupId>
+                                    <artifactId>scalap</artifactId>
+                                    <version>${scala.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>com.101tec</groupId>
+                                    <artifactId>zkclient</artifactId>
+                                    <version>${zkclient.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.zookeeper</groupId>
+                                    <artifactId>zookeeper</artifactId>
+                                    <version>3.4.6</version>
+                                </artifactItem>
+                            </artifactItems>
+                        </configuration>
+                    </execution>
+
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java 
b/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
new file mode 100644
index 0000000..7a1e07a
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hook;
+
+import com.google.common.annotations.VisibleForTesting;
+import kafka.admin.AdminUtils;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * A class to create Kafka topics used by Atlas components.
+ *
+ * Use this class to create a Kafka topic with specific configuration like 
number of partitions, replicas, etc.
+ */
+public class AtlasTopicCreator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasTopicCreator.class);
+
+    public static final String ATLAS_NOTIFICATION_CREATE_TOPICS_KEY = 
"atlas.notification.create.topics";
+
+    /**
+     * Create an Atlas topic.
+     *
+     * The topic will get created based on following conditions:
+     * {@link #ATLAS_NOTIFICATION_CREATE_TOPICS_KEY} is set to true.
+     * The topic does not already exist.
+     * Note that despite this, there could be multiple topic creation calls 
that happen in parallel because hooks
+     * run in a distributed fashion. Exceptions are caught and logged by this 
method to prevent the startup of
+     * the hooks from failing.
+     * @param atlasProperties {@link Configuration} containing properties to 
be used for creating topics.
+     * @param topicNames list of topics to create
+     */
+    public void createAtlasTopic(Configuration atlasProperties, String... 
topicNames) {
+        if (atlasProperties.getBoolean(ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, 
true)) {
+            if (!handleSecurity(atlasProperties)) {
+                return;
+            }
+            ZkUtils zkUtils = createZkUtils(atlasProperties);
+            for (String topicName : topicNames) {
+                try {
+                    LOG.warn("Attempting to create topic {}", topicName);
+                    if (!ifTopicExists(topicName, zkUtils)) {
+                        createTopic(atlasProperties, topicName, zkUtils);
+                    } else {
+                        LOG.warn("Ignoring call to create topic {}, as it 
already exists.", topicName);
+                    }
+                } catch (Throwable t) {
+                    LOG.error("Failed while creating topic {}", topicName, t);
+                }
+            }
+            zkUtils.close();
+        } else {
+            LOG.info("Not creating topics {} as {} is false", 
StringUtils.join(topicNames, ","),
+                    ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
+        }
+    }
+
+    @VisibleForTesting
+    protected boolean handleSecurity(Configuration atlasProperties) {
+        if 
(AuthenticationUtil.isKerberosAuthenticationEnabled(atlasProperties)) {
+            String kafkaPrincipal = 
atlasProperties.getString("atlas.notification.kafka.service.principal");
+            String kafkaKeyTab = 
atlasProperties.getString("atlas.notification.kafka.keytab.location");
+            org.apache.hadoop.conf.Configuration hadoopConf = new 
org.apache.hadoop.conf.Configuration();
+            
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS,
 hadoopConf);
+            try {
+                String serverPrincipal = 
SecurityUtil.getServerPrincipal(kafkaPrincipal, (String) null);
+                UserGroupInformation.setConfiguration(hadoopConf);
+                UserGroupInformation.loginUserFromKeytab(serverPrincipal, 
kafkaKeyTab);
+            } catch (IOException e) {
+                LOG.warn("Could not login as {} from keytab file {}", 
kafkaPrincipal, kafkaKeyTab, e);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
+        return AdminUtils.topicExists(zkUtils, topicName);
+    }
+
+    @VisibleForTesting
+    protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
+        int numPartitions = 
atlasProperties.getInt("atlas.notification.hook.numthreads", 1);
+        int numReplicas = 
atlasProperties.getInt("atlas.notification.replicas", 1);
+        AdminUtils.createTopic(zkUtils, topicName,  numPartitions, numReplicas,
+                new Properties());
+        LOG.warn("Created topic {} with partitions {} and replicas {}", 
topicName, numPartitions, numReplicas);
+    }
+
+    @VisibleForTesting
+    protected ZkUtils createZkUtils(Configuration atlasProperties) {
+        String zkConnect = 
atlasProperties.getString("atlas.kafka.zookeeper.connect");
+        int sessionTimeout = 
atlasProperties.getInt("atlas.kafka.zookeeper.session.timeout.ms", 400);
+        int connectionTimeout = 
atlasProperties.getInt("atlas.kafka.zookeeper.connection.timeout.ms", 200);
+        Tuple2<ZkClient, ZkConnection> zkClientAndConnection = 
ZkUtils.createZkClientAndConnection(
+                zkConnect, sessionTimeout, connectionTimeout);
+        return new ZkUtils(zkClientAndConnection._1(), 
zkClientAndConnection._2(), false);
+    }
+
+    public static void main(String[] args) throws AtlasException {
+        Configuration configuration = ApplicationProperties.get();
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
+        atlasTopicCreator.createAtlasTopic(configuration, args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java 
b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
new file mode 100644
index 0000000..8585898
--- /dev/null
+++ 
b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hook;
+
+import kafka.utils.ZkUtils;
+import org.apache.commons.configuration.Configuration;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class AtlasTopicCreatorTest {
+
+    @Test
+    public void shouldNotCreateAtlasTopicIfNotConfiguredToDoSo() {
+
+        Configuration configuration = mock(Configuration.class);
+        
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
 true)).
+                thenReturn(false);
+        
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final boolean[] topicExistsCalled = new boolean[] {false};
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) 
{
+                topicExistsCalled[0] = true;
+                return false;
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        assertFalse(topicExistsCalled[0]);
+    }
+
+    @Test
+    public void shouldNotCreateTopicIfItAlreadyExists() {
+        Configuration configuration = mock(Configuration.class);
+        
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
 true)).
+                thenReturn(true);
+        
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+        final boolean[] topicExistsCalled = new boolean[]{false};
+        final boolean[] createTopicCalled = new boolean[]{false};
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) 
{
+                topicExistsCalled[0] = true;
+                return true;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
+                createTopicCalled[0] = true;
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        assertTrue(topicExistsCalled[0]);
+        assertFalse(createTopicCalled[0]);
+    }
+
+    @Test
+    public void shouldCreateTopicIfItDoesNotExist() {
+        Configuration configuration = mock(Configuration.class);
+        
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
 true)).
+                thenReturn(true);
+        
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+
+        final boolean[] createdTopic = new boolean[]{false};
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) 
{
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
+                createdTopic[0] = true;
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        assertTrue(createdTopic[0]);
+    }
+
+    @Test
+    public void shouldNotFailIfExceptionOccursDuringCreatingTopic() {
+        Configuration configuration = mock(Configuration.class);
+        
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
 true)).
+                thenReturn(true);
+        
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+        final boolean[] createTopicCalled = new boolean[]{false};
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) 
{
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
+                createTopicCalled[0] = true;
+                throw new RuntimeException("Simulating failure during creating 
topic");
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        assertTrue(createTopicCalled[0]);
+    }
+
+    @Test
+    public void shouldCreateMultipleTopics() {
+        Configuration configuration = mock(Configuration.class);
+        
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
 true)).
+                thenReturn(true);
+        
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+
+        final Map<String, Boolean> createdTopics = new HashMap<>();
+        createdTopics.put("ATLAS_HOOK", false);
+        createdTopics.put("ATLAS_ENTITIES", false);
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) 
{
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
+                createdTopics.put(topicName, true);
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
+        assertTrue(createdTopics.get("ATLAS_HOOK"));
+        assertTrue(createdTopics.get("ATLAS_ENTITIES"));
+    }
+
+    @Test
+    public void shouldCreateTopicEvenIfEarlierOneFails() {
+        Configuration configuration = mock(Configuration.class);
+        
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
 true)).
+                thenReturn(true);
+        
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+
+        final Map<String, Boolean> createdTopics = new HashMap<>();
+        createdTopics.put("ATLAS_ENTITIES", false);
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) 
{
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
+                if (topicName.equals("ATLAS_HOOK")) {
+                    throw new RuntimeException("Simulating failure when 
creating ATLAS_HOOK topic");
+                } else {
+                    createdTopics.put(topicName, true);
+                }
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
+        assertTrue(createdTopics.get("ATLAS_ENTITIES"));
+    }
+
+    @Test
+    public void shouldCloseResources() {
+        Configuration configuration = mock(Configuration.class);
+        
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
 true)).
+                thenReturn(true);
+        
when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) 
{
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
+
+        verify(zookeeperUtils, times(1)).close();
+    }
+
+    @Test
+    public void shouldNotProcessTopicCreationIfSecurityFails() {
+        Configuration configuration = mock(Configuration.class);
+        
when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
 true)).
+                thenReturn(true);
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+        final Map<String, Boolean> createdTopics = new HashMap<>();
+        createdTopics.put("ATLAS_HOOK", false);
+        createdTopics.put("ATLAS_ENTITIES", false);
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) 
{
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String 
topicName, ZkUtils zkUtils) {
+                createdTopics.put(topicName, true);
+            }
+
+            @Override
+            protected boolean handleSecurity(Configuration atlasProperties) {
+                return false;
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", 
"ATLAS_ENTITIES");
+        assertFalse(createdTopics.get("ATLAS_HOOK"));
+        assertFalse(createdTopics.get("ATLAS_ENTITIES"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e13345e..6f75740 100755
--- a/pom.xml
+++ b/pom.xml
@@ -392,9 +392,11 @@
         <commons-conf.version>1.10</commons-conf.version>
         <commons-collections.version>3.2.2</commons-collections.version>
         <commons-logging.version>1.1.3</commons-logging.version>
+        <commons-lang.version>2.6</commons-lang.version>
         <javax-inject.version>1</javax-inject.version>
         <jettison.version>1.3.7</jettison.version>
         <paranamer.version>2.3</paranamer.version>
+        <zkclient.version>0.8</zkclient.version>
 
         <PermGen>64m</PermGen>
         <MaxPermGen>512m</MaxPermGen>
@@ -1266,6 +1268,12 @@
                 <version>3.4</version>
             </dependency>
 
+            <dependency>
+                <groupId>commons-lang</groupId>
+                <artifactId>commons-lang</artifactId>
+                <version>${commons-lang.version}</version>
+            </dependency>
+
             <!-- kafka -->
             <dependency>
                 <groupId>org.apache.kafka</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 091e077..6000fc1 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read 
from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons 
(venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-515 Ability to initialize Kafka topics with more than 1 replica 
(yhemanth)
 ATLAS-891 UI changes to implement Update term (Kalyanikashikar via yhemanth)
 ATLAS-794 Business Catalog Update (jspeidel via yhemanth)
 ATLAS-837 Enhance Sqoop addon to handle export operation (venkatnrangan via 
shwethags)


Reply via email to