http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java
 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java
new file mode 100755
index 0000000..7a180d2
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class ParallelServiceTest extends ODFTestcase {
+       private static final int NUMBER_OF_QUEUED_REQUESTS = 1;
+       Logger log = ODFTestLogger.get();
+
+       @Test
+       public void runDataSetsInParallelSuccess() throws Exception {
+               runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] 
{ "successID1", "successID2" }), State.FINISHED, State.FINISHED);
+       }
+
+       private void runDataSetsInParallelAndCheckResult(List<String> 
dataSetIDs, State... expectedState) throws Exception {
+               log.info("Running data sets in parallel: " + dataSetIDs);
+               log.info("Expected state: " + expectedState);
+               AnalysisManager analysisManager = new 
ODFFactory().create().getAnalysisManager();
+
+               List<AnalysisRequest> requests = new 
ArrayList<AnalysisRequest>();
+               List<AnalysisResponse> responses = new 
ArrayList<AnalysisResponse>();
+               List<String> idList = new ArrayList<String>();
+
+               for (int no = 0; no < NUMBER_OF_QUEUED_REQUESTS; no++) {
+                       for (String dataSet : dataSetIDs) {
+                               final AnalysisRequest req = 
ODFAPITest.createAnalysisRequest(Arrays.asList(dataSet + 
UUID.randomUUID().toString()));
+                               AnalysisResponse resp = 
analysisManager.runAnalysis(req);
+                               req.setId(resp.getId());
+                               requests.add(req);
+                               idList.add(resp.getId());
+                               responses.add(resp);
+                       }
+               }
+               log.info("Parallel requests started: " + idList.toString());
+
+               Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * 
dataSetIDs.size(), requests.size());
+               Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * 
dataSetIDs.size(), responses.size());
+
+               // check that requests are processed in parallel: 
+               //   there must be a point in time where both requests are in 
status "active"
+               log.info("Polling for status of parallel request...");
+               boolean foundPointInTimeWhereBothRequestsAreActive = false;
+               int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+               List<State> allSingleStates = new 
ArrayList<AnalysisRequestStatus.State>();
+               do {
+                       int foundActive = 0;
+                       allSingleStates.clear();
+                       for (AnalysisRequest request : requests) {
+                               final State state = 
analysisManager.getAnalysisRequestStatus(request.getId()).getState();
+                               if (state == State.ACTIVE) {
+                                       log.info("ACTIVE: " + request.getId() + 
" foundactive: " + foundActive);
+                                       foundActive++;
+                               } else {
+                                       log.info("NOT ACTIVE " + 
request.getId() + " _ " + state);
+                               }
+                               allSingleStates.add(state);
+                       }
+                       if (foundActive > 1) {
+                               foundPointInTimeWhereBothRequestsAreActive = 
true;
+                       }
+
+                       maxPolls--;
+                       Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+               } while (maxPolls > 0 && Utils.containsNone(allSingleStates, 
new State[] { State.ACTIVE, State.QUEUED }));
+
+               Assert.assertTrue(maxPolls > 0);
+               Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive);
+               
Assert.assertTrue(allSingleStates.containsAll(Arrays.asList(expectedState)));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java
 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java
new file mode 100755
index 0000000..5e3d97e
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestEnvironmentMessagingInitializer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.test.TestEnvironmentInitializer;
+
+public class TestEnvironmentMessagingInitializer implements 
TestEnvironmentInitializer {
+
+       public TestEnvironmentMessagingInitializer() {
+       }
+       
+       public void start() {
+               Logger logger = 
Logger.getLogger(TestEnvironmentMessagingInitializer.class.getName());
+               try {
+                       logger.info("Starting Test-Kafka during 
initialization...");
+                       TestKafkaStarter starter = new TestKafkaStarter();
+                       starter.startKafka();
+                       logger.info("Test-Kafka initialized");
+               } catch (Exception exc) {
+                       logger.log(Level.INFO, "Exception occurred while 
starting test kafka", exc);
+                       throw new RuntimeException(exc);
+               }
+       }
+
+       @Override
+       public void stop() {
+               // TODO Auto-generated method stub
+               
+       }
+
+       @Override
+       public String getName() {
+               return "Kafka1001";
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java
 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java
new file mode 100755
index 0000000..1c3025e
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/TestKafkaStarter.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+import java.rmi.NotBoundException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+import org.apache.atlas.odf.core.Utils;
+
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+public class TestKafkaStarter {
+
+       public static boolean deleteRecursive(File path) throws 
FileNotFoundException {
+               if (!path.exists()) {
+                       throw new FileNotFoundException(path.getAbsolutePath());
+               }
+               boolean ret = true;
+               if (path.isDirectory()) {
+                       for (File f : path.listFiles()) {
+                               ret = ret && deleteRecursive(f);
+                       }
+               }
+               return ret && path.delete();
+       }
+
+       static Thread zookeeperThread = null;
+       static boolean kafkaStarted = false;
+       static Object lockObject = new Object();
+       static KafkaServerStartable kafkaServer = null;
+       static ZooKeeperServerMainWithShutdown zooKeeperServer = null;
+
+
+       boolean cleanData = true; // all data is cleaned at server start !!
+
+       public boolean isCleanData() {
+               return cleanData;
+       }
+
+       public void setCleanData(boolean cleanData) {
+               this.cleanData = cleanData;
+       }
+
+       Logger logger = Logger.getLogger(TestKafkaStarter.class.getName());
+
+       void log(String s) {
+               logger.info(s);
+       }
+
+       int zookeeperStartupTime = 10000;
+       int kafkaStartupTime = 10000;
+
+       static class ZooKeeperServerMainWithShutdown extends 
ZooKeeperServerMain {
+               public void shutdown() {
+                       super.shutdown();
+               }
+       }
+
+       private void startZookeeper() throws Exception {
+               log("Starting zookeeper");
+
+               final Properties zkProps = 
Utils.readConfigProperties("org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties");
+               final String zkPort = (String) zkProps.get("clientPort");
+               if (zooKeeperServer == null) {
+                       log("zookeeper properties: " + zkProps);
+                       if (cleanData) {
+                               String dataDir = zkProps.getProperty("dataDir");
+                               log("Removing all data from zookeeper data dir 
" + dataDir);
+                               File dir = new File(dataDir);
+                               if (dir.exists()) {
+                                       if (!deleteRecursive(dir)) {
+                                               throw new IOException("Could 
not delete directory " + dataDir);
+                                       }
+                               }
+                       }
+                       final ZooKeeperServerMainWithShutdown zk = new 
ZooKeeperServerMainWithShutdown();
+                       final ServerConfig serverConfig = new ServerConfig();
+                       log("Loading zookeeper config...");
+                       QuorumPeerConfig zkConfig = new QuorumPeerConfig();
+                       zkConfig.parseProperties(zkProps);
+                       serverConfig.readFrom(zkConfig);
+
+                       Runnable zookeeperStarter = new Runnable() {
+
+                               @Override
+                               public void run() {
+                                       try {
+                                               log("Now starting Zookeeper 
with API...");
+                                               zk.runFromConfig(serverConfig);
+                                       } catch (BindException ex) {
+                                               log("Embedded zookeeper could 
not be started, port is already in use. Trying to use external zookeeper");
+                                               ZooKeeper zk = null;
+                                               try {
+                                                       zk = new 
ZooKeeper("localhost:" + zkPort, 5000, null);
+                                                       if 
(zk.getState().equals(States.CONNECTED)) {
+                                                               log("Using 
existing zookeeper running on port " + zkPort);
+                                                               return;
+                                                       } else {
+                                                               throw new 
NotBoundException();
+                                                       }
+                                               } catch (Exception zkEx) {
+                                                       throw new 
RuntimeException("Could not connect to zookeeper on port " + zkPort + ". Please 
close all applications listening on this port.");
+                                               } finally {
+                                                       if (zk != null) {
+                                                               try {
+                                                                       
zk.close();
+                                                               } catch 
(InterruptedException e) {
+                                                                       
logger.log(Level.WARNING, "An error occured closing the zk connection", e);
+                                                               }
+                                                       }
+                                               }
+                                       } catch (Exception e) {
+                                               e.printStackTrace();
+                                               throw new RuntimeException(e);
+                                       }
+
+                               }
+                       };
+
+                       zookeeperThread = new Thread(zookeeperStarter);
+                       zookeeperThread.setDaemon(true);
+                       zookeeperThread.start();
+                       log("Zookeeper start initiated");
+                       zooKeeperServer = zk;
+               }
+               ZkConnection conn = new ZkConnection("localhost:" + zkPort);
+               final CountDownLatch latch = new CountDownLatch(1);
+               conn.connect(new Watcher() {
+
+                       @Override
+                       public void process(WatchedEvent event) {
+                               log("Zookeeper event: " + event.getState());
+                               if 
(event.getState().equals(KeeperState.SyncConnected)) {
+                                       log("Zookeeper server up and running");
+                                       latch.countDown();
+                               }
+                       }
+               });
+
+               boolean zkReady = latch.await(zookeeperStartupTime, 
TimeUnit.MILLISECONDS);
+               if (zkReady) {
+                       log("Zookeeper initialized and started");
+
+               } else {
+                       logger.severe("Zookeeper could not be initialized 
within " + (zookeeperStartupTime / 1000) + " sec");
+               }
+               conn.close();
+       }
+
+       public boolean isRunning() {
+               return kafkaStarted;
+       }
+
+       public void startKafka() throws Exception {
+               synchronized (lockObject) {
+                       if (kafkaStarted) {
+                               log("Kafka already running");
+                               return;
+                       }
+                       this.startZookeeper();
+
+                       log("Starting Kafka server...");
+                       Properties kafkaProps = 
Utils.readConfigProperties("org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties");
+                       log("Kafka properties: " + kafkaProps);
+                       KafkaConfig kafkaConfig = new KafkaConfig(kafkaProps);
+                       int kafkaPort = kafkaConfig.port();
+                       if (cleanData && isPortAvailable(kafkaPort)) {
+                               String logDir = 
kafkaProps.getProperty("log.dirs");
+                               log("Removing all data from kafka log dir: " + 
logDir);
+                               File dir = new File(logDir);
+                               if (dir.exists()) {
+                                       if (!deleteRecursive(dir)) {
+                                               throw new IOException("Kafka 
logDir could not be deleted: " + logDir);
+                                       }
+                               }
+                       }
+                       if (!isPortAvailable(kafkaPort)) {
+                               log("Kafka port " + kafkaPort + " is already in 
use. "
+                                               + "Checking if zookeeper has a 
registered broker on this port to make sure it is an existing kafka instance 
using the port.");
+                               ZooKeeper zk = new 
ZooKeeper(kafkaConfig.zkConnect(), 10000, null);
+                               try {
+                                       List<String> ids = 
zk.getChildren("/brokers/ids", false);
+                                       if (ids != null && !ids.isEmpty()) {
+                                               for (String id : ids) {
+                                                       String brokerInfo = new 
String(zk.getData("/brokers/ids/" + id, false, null), "UTF-8");
+                                                       JSONObject broker = new 
JSONObject(brokerInfo);
+                                                       Integer port = new 
Integer(String.valueOf(broker.get("port")));
+                                                       if (port != null && 
port.equals(kafkaPort)) {
+                                                               log("Using 
externally started kafka broker on port " + port);
+                                                               kafkaStarted = 
true;
+                                                               return;
+                                                       }
+                                               }
+                                       }
+                               } catch (NoNodeException ex) {
+                                       log("No brokers registered with 
zookeeper!");
+                                       throw new RuntimeException("Kafka 
broker port " + kafkaPort
+                                                       + " not available and 
no broker found! Please close all running applications listening on this port");
+                               } finally {
+                                       if (zk != null) {
+                                               try {
+                                                       zk.close();
+                                               } catch (InterruptedException 
e) {
+                                                       
logger.log(Level.WARNING, "An error occured closing the zk connection", e);
+                                               }
+                                       }
+                               }
+                       }
+                       KafkaServerStartable kafka  = 
KafkaServerStartable.fromProps(kafkaProps);
+                       kafka.startup();
+                       log("Kafka server start initiated");
+
+                       kafkaServer = kafka;
+                       log("Give Kafka a maximum of " + kafkaStartupTime + " 
ms to start");
+                       ZkClient zk = new ZkClient(kafkaConfig.zkConnect(), 
10000, 5000, ZKStringSerializer$.MODULE$);
+                       int maxRetryCount = kafkaStartupTime / 1000;
+                       int cnt = 0;
+                       while (cnt < maxRetryCount) {
+                               cnt++;
+                               Seq<Broker> allBrokersInCluster = new 
ZkUtils(zk, new ZkConnection(kafkaConfig.zkConnect()), 
false).getAllBrokersInCluster();
+                               List<Broker> brokers = 
JavaConversions.seqAsJavaList(allBrokersInCluster);
+                               for (Broker broker : brokers) {
+                                       if 
(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port() == kafkaPort) {
+                                               log("Broker is registered, 
Kafka is available after " + cnt + " seconds");
+                                               kafkaStarted = true;
+                                               return;
+                                       }
+                               }
+                               Thread.sleep(1000);
+                       }
+                       logger.severe("Kafka broker was not started after " + 
kafkaStartupTime + " ms");
+               }
+       }
+
+       public void shutdownKafka() {
+               // do nothing for shutdown
+       }
+
+       boolean isPortAvailable(int port) {
+               ServerSocket ss = null;
+               DatagramSocket ds = null;
+               try {
+                       ss = new ServerSocket(port);
+                       ss.setReuseAddress(true);
+                       ds = new DatagramSocket(port);
+                       ds.setReuseAddress(true);
+                       return true;
+               } catch (IOException e) {
+               } finally {
+                       if (ds != null) {
+                               ds.close();
+                       }
+
+                       if (ss != null) {
+                               try {
+                                       ss.close();
+                               } catch (IOException e) {
+                               }
+                       }
+               }
+
+               return false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties
 
b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties
new file mode 100755
index 0000000..4769c95
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-kafka.properties
@@ -0,0 +1,136 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings 
#############################
+
+listeners=PLAINTEXT://:9092 
+
+# The port the socket server listens on
+# port=9092
+
+# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
+#host.name=localhost
+
+# Hostname the broker will advertise to producers and consumers. If not set, 
it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value 
returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=3
+ 
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection 
against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=/tmp/odf-embedded-test-kafka/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at 
startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs 
located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data 
to disk. 
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using 
replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when 
the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks. 
+# The settings below allow one to configure the flush policy to flush data 
after a period of time or
+# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy 
#############################
+
+# The following configurations control the disposal of log segments. The 
policy can
+# be set to delete segments after a period of time, or after a given size has 
accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=24
+
+# A size-based retention policy for logs. Segments are pruned from the log as 
long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log 
segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted 
according 
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+# By default the log cleaner is disabled and the log retention policy will 
default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual 
logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeperConnectionTimeoutMs=6000

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties
 
b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties
new file mode 100755
index 0000000..7234e9c
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/core/messaging/kafka/test-embedded-zookeeper.properties
@@ -0,0 +1,34 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/odf-embedded-test-kafka/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a 
non-production config
+maxClientCnxns=0
+

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties
 
b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..5611c29
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,18 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+## USE for TESTs only
+
+ODFConfigurationStorage=MockConfigurationStorage
+SparkServiceExecutor=MockSparkServiceExecutor
+NotificationManager=TestNotificationManager

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-spark-example-application/.gitignore 
b/odf/odf-spark-example-application/.gitignore
new file mode 100755
index 0000000..d523581
--- /dev/null
+++ b/odf/odf-spark-example-application/.gitignore
@@ -0,0 +1,20 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+.settings
+target
+.classpath
+.project
+.factorypath
+.DS_Store
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-spark-example-application/pom.xml 
b/odf/odf-spark-example-application/pom.xml
new file mode 100755
index 0000000..a2baa9e
--- /dev/null
+++ b/odf/odf-spark-example-application/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"; 
xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>com.apache.atlas.odf</groupId>
+               <artifactId>odf</artifactId>
+               <version>1.2.0-SNAPSHOT</version>
+       </parent>
+       <artifactId>odf-spark-example-application</artifactId>
+       <packaging>jar</packaging>
+       <name>odf-spark-example-application</name>
+       <build>
+               <plugins>
+                       <plugin>
+                               <artifactId>maven-compiler-plugin</artifactId>
+                               <version>3.3</version>
+                               <configuration>
+                                       <source>1.7</source>
+                                       <target>1.7</target>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-assembly-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <descriptorRefs>
+                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
+                                       </descriptorRefs>
+                               </configuration>
+                       </plugin>
+               </plugins>
+
+       </build>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.spark</groupId>
+                       <artifactId>spark-sql_2.11</artifactId>
+                       <version>2.1.0</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency> <!-- Spark dependency -->
+                       <groupId>org.apache.spark</groupId>
+                       <artifactId>spark-core_2.11</artifactId>
+                       <version>2.1.0</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-api</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java
 
b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java
new file mode 100755
index 0000000..f5f7b70
--- /dev/null
+++ 
b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SparkDiscoveryServiceExample.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.spark.SparkDiscoveryServiceBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import org.apache.atlas.odf.api.spark.SparkDiscoveryService;
+import org.apache.atlas.odf.api.spark.SparkUtils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import 
org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse.ResponseCode;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+
+public class SparkDiscoveryServiceExample extends SparkDiscoveryServiceBase 
implements SparkDiscoveryService {
+       static Logger logger = 
Logger.getLogger(SparkDiscoveryServiceExample.class.getName());
+
+       @Override
+       public DataSetCheckResult checkDataSet(DataSetContainer 
dataSetContainer) {
+               logger.log(Level.INFO, "Checking data set access.");
+               DataSetCheckResult checkResult = new DataSetCheckResult();
+               
checkResult.setDataAccess(DataSetCheckResult.DataAccess.Possible);
+               Dataset<Row> df = SparkUtils.createDataFrame(this.spark, 
dataSetContainer, this.mds);
+               // Print first rows to check whether data frame can be accessed
+               df.show(10);
+               return checkResult;
+       }
+
+       @Override
+       public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest 
request) {
+               logger.log(Level.INFO, "Starting discovery service.");
+               Dataset<Row> df = SparkUtils.createDataFrame(spark, 
request.getDataSetContainer(), this.mds);
+               Map<String,Dataset<Row>> annotationDataFrameMap = 
SummaryStatistics.processDataFrame(this.spark, df, null);
+               DiscoveryServiceSyncResponse response = new 
DiscoveryServiceSyncResponse();
+               response.setCode(ResponseCode.OK);
+               response.setDetails("Discovery service successfully 
completed.");
+               
response.setResult(SparkUtils.createAnnotationsFromDataFrameMap(request.getDataSetContainer(),
 annotationDataFrameMap, this.mds));
+               return response;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java
 
b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java
new file mode 100755
index 0000000..a7d1542
--- /dev/null
+++ 
b/odf/odf-spark-example-application/src/main/java/org/apache/atlas/odf/core/spark/SummaryStatistics.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import org.apache.atlas.odf.api.spark.SparkUtils;
+import org.apache.spark.SparkFiles;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+public class SummaryStatistics {
+       static Logger logger = 
Logger.getLogger(SummaryStatistics.class.getName());
+       private static final String CSV_FILE_PARAMETER = "-dataFile=";
+       // The following constant is defined in class 
DiscoveryServiceSparkEndpoint but is duplicated here to avoid dependencies to 
the ODF code:
+       private static final String ANNOTATION_PROPERTY_COLUMN_NAME = 
"ODF_ANNOTATED_COLUMN";
+
+       // The main method is only available for testing purposes and is not 
called by ODF
+       public static void main(String[] args) {
+               logger.log(Level.INFO, "Running spark launcher with arguments: 
" + args[0]);
+               if ((args[0] == null) || 
(!args[0].startsWith(CSV_FILE_PARAMETER))) {
+                       System.out.println(MessageFormat.format("Error: Spark 
Application Parameter '{0}' is missing.", CSV_FILE_PARAMETER));
+                       System.exit(1);
+               }
+               String dataFilePath = 
SparkFiles.get(args[0].replace(CSV_FILE_PARAMETER, ""));
+               logger.log(Level.INFO, "Data file path is " + dataFilePath);
+
+               // Create Spark session
+               SparkSession spark = 
SparkSession.builder().master("local").appName("ODF Spark example 
application").getOrCreate();
+
+               // Read CSV file into data frame
+               Dataset<Row> df = spark.read()
+                   .format("com.databricks.spark.csv")
+                   .option("inferSchema", "true")
+                   .option("header", "true")
+                   .load(dataFilePath);
+
+               // Run actual job and print result
+               Map<String, Dataset<Row>> annotationDataFrameMap = null;
+               try {
+                       annotationDataFrameMap = processDataFrame(spark, df, 
args);
+               } catch (Exception e) {
+                       logger.log(Level.INFO, MessageFormat.format("An error 
occurred while processing data set {0}:", args[0]), e);
+               } finally {
+                       // Close and stop spark context
+                       spark.close();
+                       spark.stop();
+               }
+               if (annotationDataFrameMap == null) {
+                       System.exit(1);
+               } else {
+                       // Print all annotationDataFrames for all annotation 
types to stdout
+                       for (Map.Entry<String, Dataset<Row>> entry : 
annotationDataFrameMap.entrySet()) {
+                               logger.log(Level.INFO, "Result data frame for 
annotation type " + entry.getKey() + ":");
+                               entry.getValue().show();
+                       }
+               }
+       }
+
+       // The following method contains the actual implementation of the ODF 
Spark discovery service
+       public static Map<String,Dataset<Row>> processDataFrame(SparkSession 
spark, Dataset<Row> df, String[] args) {
+               logger.log(Level.INFO, "Started summary statistics Spark 
application.");
+               Map<String, Dataset<Row>> resultMap = new HashMap<String, 
Dataset<Row>>();
+
+               // Print input data set
+               df.show();
+
+               // Create column annotation data frame that contains basic data 
frame statistics
+               Dataset<Row> dfStatistics = df.describe();
+
+               // Rename "summary" column to ANNOTATION_PROPERTY_COLUMN_NAME
+               String[] columnNames = dfStatistics.columns();
+               columnNames[0] = ANNOTATION_PROPERTY_COLUMN_NAME;
+               Dataset<Row> summaryStatistics =  
dfStatistics.toDF(columnNames);
+               summaryStatistics.show();
+               String columnAnnotationTypeName = 
"SparkSummaryStatisticsAnnotation";
+
+               // Transpose table to turn it into format required by ODF
+               Dataset<Row> columnAnnotationDataFrame = 
SparkUtils.transposeDataFrame(spark, summaryStatistics);
+               columnAnnotationDataFrame.show();
+
+               // Create table annotation that contains the data frame's 
column count
+               String tableAnnotationTypeName = "SparkTableAnnotation";
+               Dataset<Row> tableAnnotationDataFrame = 
columnAnnotationDataFrame.select(new Column("count")).limit(1);
+               tableAnnotationDataFrame.show();
+
+               // Add annotation data frames to result map
+               resultMap.put(columnAnnotationTypeName, 
columnAnnotationDataFrame);
+               resultMap.put(tableAnnotationTypeName, 
tableAnnotationDataFrame);
+
+               logger.log(Level.INFO, "Spark job finished.");
+               return resultMap;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-spark/.gitignore b/odf/odf-spark/.gitignore
new file mode 100755
index 0000000..cde346c
--- /dev/null
+++ b/odf/odf-spark/.gitignore
@@ -0,0 +1,19 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+.settings
+target
+.classpath
+.project
+.factorypath
+.DS_Store

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-spark/pom.xml b/odf/odf-spark/pom.xml
new file mode 100755
index 0000000..378f280
--- /dev/null
+++ b/odf/odf-spark/pom.xml
@@ -0,0 +1,242 @@
+<?xml version="1.0"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.apache.atlas.odf</groupId>
+               <artifactId>odf</artifactId>
+               <version>1.2.0-SNAPSHOT</version>
+       </parent>
+       <artifactId>odf-spark</artifactId>
+       <packaging>jar</packaging>
+       <name>odf-spark</name>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-api</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-core</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-core</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-messaging</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-messaging</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-store</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+                       <scope>test</scope>
+               </dependency>
+               <!-- Workaround: Add odf-spark-example-application because 
dynamic jar load does not seem to work on IBM JDK -->
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-spark-example-application</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <version>4.12</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.spark</groupId>
+                       <artifactId>spark-launcher_2.11</artifactId>
+                       <version>2.1.0</version>
+               </dependency>
+               <dependency>
+                       <groupId>commons-io</groupId>
+                       <artifactId>commons-io</artifactId>
+                       <version>2.4</version>
+               </dependency>
+               <!-- The following Spark dependencies are needed for testing 
only. -->
+               <!-- Nevertheless, they have to be added as compile 
dependencies in order to become available to the SDPFactory. -->
+               <dependency>
+                       <groupId>org.apache.spark</groupId>
+                       <artifactId>spark-core_2.11</artifactId>
+                       <version>2.1.0</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>commons-codec</groupId>
+                                       <artifactId>commons-codec</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.spark</groupId>
+                       <artifactId>spark-sql_2.11</artifactId>
+                       <version>2.1.0</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>commons-codec</groupId>
+                                       <artifactId>commons-codec</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+       </dependencies>
+       <build>
+               <resources>
+                       <resource>
+                               
<directory>${project.build.directory}/downloads</directory>
+                       </resource>
+               </resources>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <version>2.6</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <version>2.19</version>
+                               <configuration>
+                                       <systemPropertyVariables>
+                                               
<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+                                               
<odf.logspec>${odf.unittest.logspec}</odf.logspec>
+                                               
<odf.build.project.name>${project.name}</odf.build.project.name>
+                                       </systemPropertyVariables>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-dependency-plugin</artifactId>
+                               <version>2.4</version>
+                               <executions>
+                                       <execution>
+                                               <id>download-jar-file</id>
+                                               <phase>validate</phase>
+                                               <goals>
+                                                       <goal>copy</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <artifactItems>
+                                                               <artifactItem>
+                                                                       
<groupId>org.apache.atlas.odf</groupId>
+                                                                       
<artifactId>odf-api</artifactId>
+                                                                       
<version>1.2.0-SNAPSHOT</version>
+                                                                       
<type>jar</type>
+                                                                       
<overWrite>true</overWrite>
+                                                                       
<outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory>
+                                                               </artifactItem>
+                                                               <artifactItem>
+                                                                       
<groupId>org.apache.atlas.odf</groupId>
+                                                                       
<artifactId>odf-spark-example-application</artifactId>
+                                                                       
<version>1.2.0-SNAPSHOT</version>
+                                                                       
<type>jar</type>
+                                                                       
<overWrite>true</overWrite>
+                                                                       
<outputDirectory>/tmp/odf-spark</outputDirectory>
+                                                               </artifactItem>
+                                                               <artifactItem>
+                                                                       
<groupId>org.apache.atlas.odf</groupId>
+                                                                       
<artifactId>odf-spark-example-application</artifactId>
+                                                                       
<version>1.2.0-SNAPSHOT</version>
+                                                                       
<type>jar</type>
+                                                                       
<overWrite>true</overWrite>
+                                                                       
<outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory>
+                                                               </artifactItem>
+                                                               <artifactItem>
+                                                                       
<groupId>org.apache.wink</groupId>
+                                                                       
<artifactId>wink-json4j</artifactId>
+                                                                       
<version>1.4</version>
+                                                                       
<type>jar</type>
+                                                                       
<overWrite>true</overWrite>
+                                                                       
<outputDirectory>${project.build.directory}/downloads/META-INF/spark</outputDirectory>
+                                                               </artifactItem>
+                                                       </artifactItems>
+                                                       
<includes>**/*</includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+       <profiles>
+               <profile>
+                       <id>integration-tests</id>
+                       <activation>
+                               <property>
+                                       <name>reduced-tests</name>
+                                       <value>!true</value>
+                               </property>
+                       </activation>
+                       <build>
+                               <plugins>
+                                       <plugin>
+                                               
<groupId>org.apache.maven.plugins</groupId>
+                                               
<artifactId>maven-failsafe-plugin</artifactId>
+                                               <version>2.19</version>
+                                               <configuration>
+                                                       
<systemPropertyVariables>
+                                                               
<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+                                                               
<odf.logspec>${odf.integrationtest.logspec}</odf.logspec>
+                                                       
</systemPropertyVariables>
+                                                       <dependenciesToScan>
+                                                               
<dependency>org.apache.atlas.odf:odf-core</dependency>
+                                                       </dependenciesToScan>
+                                                       <includes>
+                                                               
<include>**/integrationtest/**/SparkDiscoveryServiceLocalTest.java</include>
+                                                       </includes>
+                                               </configuration>
+                                               <executions>
+                                                       <execution>
+                                                               
<id>integration-test</id>
+                                                               <goals>
+                                                                       
<goal>integration-test</goal>
+                                                               </goals>
+                                                       </execution>
+                                                       <execution>
+                                                               <id>verify</id>
+                                                               <goals>
+                                                                       
<goal>verify</goal>
+                                                               </goals>
+                                                       </execution>
+                                               </executions>
+                                       </plugin>
+                               </plugins>
+                       </build>
+               </profile>
+       </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java
 
b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java
new file mode 100755
index 0000000..84ae80c
--- /dev/null
+++ 
b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/LocalSparkServiceExecutor.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.text.MessageFormat;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.RelationalDataSet;
+import org.apache.atlas.odf.api.spark.SparkDiscoveryService;
+import org.apache.atlas.odf.api.spark.SparkServiceExecutor;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import 
org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint.SERVICE_INTERFACE_TYPE;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.spark.SparkUtils;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ * This class calls the actual Spark discovery services depending on the type 
of interface they implement.
+ * The class is used to run a Spark discovery service either on a local Spark 
cluster ({@link SparkServiceExecutorImpl})
+ * or on a remote Spark cluster ({@link SparkApplicationStub}).
+ * 
+ *
+ */
+
+public class LocalSparkServiceExecutor implements SparkServiceExecutor {
+       private Logger logger = 
Logger.getLogger(LocalSparkServiceExecutor.class.getName());
+       private SparkSession spark;
+       private MetadataStore mds;
+
+       void setSparkSession(SparkSession spark) {
+               this.spark = spark;
+       }
+
+       void setMetadataStore(MetadataStore mds) {
+               this.mds = mds;
+       }
+
+       @Override
+       public DataSetCheckResult checkDataSet(DiscoveryServiceProperties 
dsProp, DataSetContainer container) {
+               DiscoveryServiceSparkEndpoint endpoint;
+               try {
+                       endpoint = JSONUtils.convert(dsProp.getEndpoint(), 
DiscoveryServiceSparkEndpoint.class);
+               } catch (JSONException e1) {
+                       throw new RuntimeException(e1);
+               }
+               DataSetCheckResult checkResult = new DataSetCheckResult();
+               try {
+                       SERVICE_INTERFACE_TYPE inputMethod = 
endpoint.getInputMethod();
+                       if 
(inputMethod.equals(SERVICE_INTERFACE_TYPE.DataFrame)) {
+                               MetaDataObject dataSet = container.getDataSet();
+                               if (!(dataSet instanceof RelationalDataSet)) {
+                                       
checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible);
+                                       checkResult.setDetails("This service 
can only process relational data sets.");
+                               } else {
+                                       
checkResult.setDataAccess(DataSetCheckResult.DataAccess.Possible);
+                                       Dataset<Row> df = 
SparkUtils.createDataFrame(this.spark, container, this.mds);
+                                       // Print first rows to check whether 
data frame can be accessed
+                                       df.show(10);
+                               }
+                       } else if 
(inputMethod.equals(SERVICE_INTERFACE_TYPE.Generic)) {
+                               Class<?> clazz = 
Class.forName(endpoint.getClassName());
+                               Constructor<?> cons = clazz.getConstructor();
+                               SparkDiscoveryService service = 
(SparkDiscoveryService) cons.newInstance();
+                               service.setMetadataStore(this.mds);
+                               service.setSparkSession(this.spark);
+                               checkResult = service.checkDataSet(container);
+                       }
+               } catch (Exception e) {
+                       logger.log(Level.WARNING,"Access to data set not 
possible.", e);
+                       
checkResult.setDataAccess(DataSetCheckResult.DataAccess.NotPossible);
+                       checkResult.setDetails(getExceptionAsString(e));
+               } finally {
+                       this.spark.close();
+                       this.spark.stop();
+               }
+               return checkResult;
+       }
+
+       @Override
+       public DiscoveryServiceSyncResponse 
runAnalysis(DiscoveryServiceProperties dsProp, DiscoveryServiceRequest request) 
{
+               DiscoveryServiceSyncResponse response = new 
DiscoveryServiceSyncResponse();
+               response.setDetails("Annotations created successfully");
+               response.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+               try {
+                       DiscoveryServiceSparkEndpoint endpoint = 
JSONUtils.convert(dsProp.getEndpoint(), DiscoveryServiceSparkEndpoint.class);
+                       Class<?> clazz = Class.forName(endpoint.getClassName());
+                       DataSetContainer container = 
request.getDataSetContainer();
+                       String[] optionalArgs = {}; // For future use
+                       SERVICE_INTERFACE_TYPE inputMethod = 
endpoint.getInputMethod();
+
+                       if 
(inputMethod.equals(SERVICE_INTERFACE_TYPE.DataFrame)) {
+                               if (!(container.getDataSet() instanceof 
RelationalDataSet)) {
+                                       throw new RuntimeException("This 
service can only process relational data sets (DataFile or Table).");
+                               }
+                               Dataset<Row> df = 
SparkUtils.createDataFrame(this.spark, container, this.mds);
+                               @SuppressWarnings("unchecked")
+                               Map<String, Dataset<Row>> 
annotationDataFrameMap = (Map<String, Dataset<Row>>) 
clazz.getMethod("processDataFrame", SparkSession.class, Dataset.class, 
String[].class).invoke(null, this.spark, df, (Object[]) optionalArgs);
+                               
response.setResult(SparkUtils.createAnnotationsFromDataFrameMap(container, 
annotationDataFrameMap, this.mds));
+                       } else if 
(inputMethod.equals(SERVICE_INTERFACE_TYPE.Generic)) {
+                               Constructor<?> cons = clazz.getConstructor();
+                               SparkDiscoveryService service = 
(SparkDiscoveryService) cons.newInstance();
+                               service.setMetadataStore(this.mds);
+                               service.setSparkSession(this.spark);
+                               response = service.runAnalysis(request);
+                       } else {
+                               throw new 
RuntimeException(MessageFormat.format("Unsupported interface type {0}.", 
inputMethod));
+                       }
+               } catch(Exception e) {
+                       logger.log(Level.WARNING,"Error running discovery 
service.", e);
+                       response.setDetails(getExceptionAsString(e));
+                       
response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+               } finally {
+                       this.spark.close();
+                       this.spark.stop();
+               }
+               return response;
+       }
+
+       public static String getExceptionAsString(Throwable exc) {
+               StringWriter sw = new StringWriter();
+               PrintWriter pw = new PrintWriter(sw);
+               exc.printStackTrace(pw);
+               String st = sw.toString();
+               return st;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java 
b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java
new file mode 100755
index 0000000..81fea2c
--- /dev/null
+++ b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkJars.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.MessageFormat;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+
+import org.apache.atlas.odf.core.Utils;
+
+public class SparkJars {
+       private static Logger logger = 
Logger.getLogger(SparkJars.class.getName());
+
+       public String getResourceAsJarFile(String resource) {
+               ClassLoader cl = this.getClass().getClassLoader();
+               InputStream inputStream = cl.getResourceAsStream(resource);
+               if (inputStream == null) {
+               String msg = MessageFormat.format("Resource {0} was not 
found.", resource);
+               logger.log(Level.WARNING, msg);
+               throw new RuntimeException(msg);
+               }
+               String tempFilePath = null;
+               try {
+                   File tempFile = File.createTempFile("driver", "jar");
+                   tempFilePath = tempFile.getAbsolutePath();
+                   logger.log(Level.INFO, "Creating temporary file " + 
tempFilePath);
+                       IOUtils.copy(inputStream, new 
FileOutputStream(tempFile));
+                       inputStream.close();
+                       Utils.runSystemCommand("chmod 755 " + tempFilePath);
+               } catch (IOException e) {
+               String msg = MessageFormat.format("Error creating temporary 
file from resource {0}: ", resource);
+               logger.log(Level.WARNING, msg, e);
+               throw new RuntimeException(msg + Utils.getExceptionAsString(e));
+               }
+               return tempFilePath;
+       }
+
+       public String getUrlasJarFile(String urlString) {
+               try {
+                   File tempFile = File.createTempFile("driver", "jar");
+               logger.log(Level.INFO, "Creating temporary file " + tempFile);
+                       FileUtils.copyURLToFile(new URL(urlString), tempFile);
+                       Utils.runSystemCommand("chmod 755 " + 
tempFile.getAbsolutePath());
+                       return tempFile.getAbsolutePath();
+               } catch (MalformedURLException e) {
+                       String msg = MessageFormat.format("An invalid Spark 
application URL {0} was provided: ", urlString);
+                       logger.log(Level.WARNING, msg, e);
+                       throw new RuntimeException(msg + 
Utils.getExceptionAsString(e));
+               } catch (IOException e) {
+                       logger.log(Level.WARNING, "Error processing Spark 
application jar file.", e);
+                       throw new RuntimeException("Error processing Spark 
application jar file: " + Utils.getExceptionAsString(e));
+               }
+       }
+
+       public byte[] getFileAsByteArray(String resourceOrURL) {
+        try {
+               InputStream inputStream;
+               if (isValidUrl(resourceOrURL)) {
+               inputStream = new URL(resourceOrURL).openStream();
+               } else {
+                       ClassLoader cl = this.getClass().getClassLoader();
+                       inputStream = cl.getResourceAsStream(resourceOrURL);
+                       if (inputStream == null) {
+                       String msg = MessageFormat.format("Resource {0} was not 
found.", resourceOrURL);
+                       logger.log(Level.WARNING, msg);
+                       throw new RuntimeException(msg);
+                       }
+               }
+               byte[] bytes = IOUtils.toByteArray(inputStream);
+               return bytes;
+        } catch (IOException e) {
+               String msg = MessageFormat.format("Error converting jar file 
{0} into byte array: ", resourceOrURL);
+               logger.log(Level.WARNING, msg, e);
+               throw new RuntimeException(msg + Utils.getExceptionAsString(e));
+        }
+       }
+
+       public static boolean isValidUrl(String urlString) {
+               try {
+                       new URL(urlString);
+                       return true;
+               } catch (java.net.MalformedURLException exc) {
+                       // Expected exception if URL is not valid
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java
 
b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java
new file mode 100755
index 0000000..720343b
--- /dev/null
+++ 
b/odf/odf-spark/src/main/java/org/apache/atlas/odf/core/spark/SparkServiceExecutorImpl.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.spark;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.text.MessageFormat;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.api.spark.SparkServiceExecutor;
+import org.apache.spark.sql.SparkSession;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceSparkEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ * Calls the appropriate implementation (local vs. remote) of the @link 
SparkServiceExecutor depending on the current @SparkConfig.
+ * Prepares the local Spark cluster to be used in unit and integration tests.
+ * 
+ *
+ */
+
+public class SparkServiceExecutorImpl implements SparkServiceExecutor {
+       private Logger logger = 
Logger.getLogger(SparkServiceExecutorImpl.class.getName());
+
+       @Override
+       public DataSetCheckResult checkDataSet(DiscoveryServiceProperties dsri, 
DataSetContainer dataSetContainer) {
+               return this.getExecutor(dsri).checkDataSet(dsri, 
dataSetContainer);
+       };
+
+       @Override
+       public DiscoveryServiceSyncResponse 
runAnalysis(DiscoveryServiceProperties dsri, DiscoveryServiceRequest request) {
+               return this.getExecutor(dsri).runAnalysis(dsri, request);
+       }
+
+       private SparkServiceExecutor getExecutor(DiscoveryServiceProperties 
dsri) {
+               SettingsManager config = new 
ODFFactory().create().getSettingsManager();
+               DiscoveryServiceSparkEndpoint endpoint;
+               try {
+                       endpoint = JSONUtils.convert(dsri.getEndpoint(), 
DiscoveryServiceSparkEndpoint.class);
+               } catch (JSONException e1) {
+                       throw new RuntimeException(e1);
+               }
+
+               SparkConfig sparkConfig = 
config.getODFSettings().getSparkConfig();
+               if (sparkConfig == null) {
+                       String msg = "No Spark service is configured. Please 
manually register Spark service or bind a Spark service to your ODF Bluemix 
app.";
+                       logger.log(Level.SEVERE, msg);
+                       throw new RuntimeException(msg);
+               } else {
+                       logger.log(Level.INFO, "Using local Spark cluster 
{0}.", sparkConfig.getClusterMasterUrl());
+                       SparkSession spark = 
SparkSession.builder().master(sparkConfig.getClusterMasterUrl()).appName(dsri.getName()).getOrCreate();
+                       SparkJars sparkJars = new SparkJars();
+                       try {
+                           // Load jar file containing the Spark job to be 
started
+                           URLClassLoader classLoader = 
(URLClassLoader)ClassLoader.getSystemClassLoader();
+                               Method method = 
URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+                           method.setAccessible(true);
+                           String applicationJarFile;
+                               if (SparkJars.isValidUrl(endpoint.getJar())) {
+                                       applicationJarFile = 
sparkJars.getUrlasJarFile(endpoint.getJar());
+                               } else {
+                                       applicationJarFile = 
sparkJars.getResourceAsJarFile(endpoint.getJar());
+                               }
+                               logger.log(Level.INFO, "Using application jar 
file {0}.", applicationJarFile);
+                           method.invoke(classLoader, new URL("file:" + 
applicationJarFile));
+                       } catch (Exception e) {
+                               String msg = MessageFormat.format("Error 
loading jar file {0} implementing the Spark discovery service: ", 
endpoint.getJar());
+                               logger.log(Level.WARNING, msg, e);
+                               spark.close();
+                               spark.stop();
+                               throw new RuntimeException(msg, e);
+                       }
+                       LocalSparkServiceExecutor executor = new 
LocalSparkServiceExecutor();
+                       executor.setSparkSession(spark);
+                       executor.setMetadataStore(new 
ODFFactory().create().getMetadataStore());
+                   return executor;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git 
a/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
 
b/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..d6651ee
--- /dev/null
+++ 
b/odf/odf-spark/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+SparkServiceExecutor=org.apache.atlas.odf.core.spark.SparkServiceExecutorImpl

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-store/.gitignore b/odf/odf-store/.gitignore
new file mode 100755
index 0000000..ea5ddb8
--- /dev/null
+++ b/odf/odf-store/.gitignore
@@ -0,0 +1,5 @@
+.settings
+target
+.classpath
+.project
+.factorypath
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-store/pom.xml b/odf/odf-store/pom.xml
new file mode 100755
index 0000000..3d0a93d
--- /dev/null
+++ b/odf/odf-store/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+       xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.apache.atlas.odf</groupId>
+               <artifactId>odf</artifactId>
+               <version>1.2.0-SNAPSHOT</version>
+       </parent>
+       <artifactId>odf-store</artifactId>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-core</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+                       <scope>compile</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-messaging</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.zookeeper</groupId>
+                       <artifactId>zookeeper</artifactId>
+                       <version>3.4.6</version>
+                       <scope>compile</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <version>4.12</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.atlas.odf</groupId>
+                       <artifactId>odf-core</artifactId>
+                       <version>1.2.0-SNAPSHOT</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <version>2.19</version>
+                               <configuration>
+                                       <systemPropertyVariables>
+                                               
<odf.logspec>${odf.unittest.logspec}</odf.logspec>
+                                               
<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+                                               
<odf.build.project.name>${project.name}</odf.build.project.name>
+                                       </systemPropertyVariables>
+                                       <dependenciesToScan>
+                                               
<dependency>org.apache.atlas.odf:odf-core</dependency>
+                                       </dependenciesToScan>
+                                       <includes>
+                                           
<include>**/configuration/**/*.java</include>
+                                               
<include>**/ZookeeperConfigurationStorageTest.java</include>
+                                       </includes>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java
 
b/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java
new file mode 100755
index 0000000..3ea9927
--- /dev/null
+++ 
b/odf/odf-store/src/main/java/org/apache/atlas/odf/core/store/zookeeper34/ZookeeperConfigurationStorage.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.store.zookeeper34;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.text.MessageFormat;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import org.apache.atlas.odf.core.store.ODFConfigurationStorage;
+
+public class ZookeeperConfigurationStorage implements ODFConfigurationStorage {
+       private Logger logger = 
Logger.getLogger(ZookeeperConfigurationStorage.class.getName());
+       static final String ZOOKEEPER_CONFIG_PATH = "/odf/config";
+       static String configCache = null; // cache is a string so that the 
object is not accidentally modified
+       static Object configCacheLock = new Object();
+       static HashSet<String> pendingConfigChanges = new HashSet<String>();
+
+       String zookeeperString;
+
+       public ZookeeperConfigurationStorage() {
+               zookeeperString = new 
ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+       }
+
+       public void clearCache() {
+               synchronized (configCacheLock) {
+                       configCache = null;
+               }
+       }
+       
+       @Override
+       public void storeConfig(ConfigContainer config) {
+               synchronized (configCacheLock) {
+                       ZooKeeper zk = null;
+                       String configTxt = null;
+                       try {
+                               configTxt = JSONUtils.toJSON(config);
+                               zk = getZkConnectionSynchronously();
+                               if (zk.exists(getZookeeperConfigPath(), false) 
== null) {
+                                       //config file doesn't exist in 
zookeeper yet, write default config
+                                       logger.log(Level.WARNING, "Zookeeper 
config not found - creating it before writing: {0}", configTxt);
+                                       initializeConfiguration(zk, configTxt);
+                               }
+                               zk.setData(getZookeeperConfigPath(), 
configTxt.getBytes("UTF-8"), -1);
+                               configCache = configTxt;
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                               throw new RuntimeException("A zookeeper 
connection could not be established in time to write settings");
+                       } catch (KeeperException e) {
+                               if (Code.NONODE.equals(e.code())) {
+                                       logger.info("Setting could not be 
written, the required node is not available!");
+                                       initializeConfiguration(zk, configTxt);
+                                       return;
+                               }
+                               //This should never happen! Only NoNode or 
BadVersion codes are possible. Because the file version is ignored, a 
BadVersion should never occur
+                               throw new RuntimeException("A zookeeper 
connection could not be established because of an unknown exception", e);
+                       } catch (UnsupportedEncodingException e) {
+                               throw new RuntimeException("A zookeeper 
connection could not be established because of an incorrect encoding");
+                       } catch (JSONException e) {
+                               throw new RuntimeException("Configuration is 
not valid", e);
+                       } finally {
+                               if (zk != null) {
+                                       try {
+                                               zk.close();
+                                       } catch (InterruptedException e) {
+                                               e.printStackTrace();
+                                       }
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public ConfigContainer getConfig(ConfigContainer defaultConfiguration) {
+               synchronized (configCacheLock) {
+                       if (configCache == null) {
+                               ZooKeeper zk = getZkConnectionSynchronously();
+                               try {
+                                       if (zk.exists(getZookeeperConfigPath(), 
false) == null) {
+                                               //config file doesn't exist in 
zookeeper yet, write default config
+                                               String defaultConfigString = 
JSONUtils.toJSON(defaultConfiguration);
+                                               logger.log(Level.WARNING, 
"Zookeeper config not found - creating now with default: {0}", 
defaultConfigString);
+                                               initializeConfiguration(zk, 
defaultConfigString);
+                                       }
+                                       byte[] configBytes = 
zk.getData(getZookeeperConfigPath(), true, new Stat());
+                                       if (configBytes != null) {
+                                               String configString = new 
String(configBytes, "UTF-8");
+                                               configCache = configString;
+                                       } else {
+                                               // should never happen
+                                               throw new 
RuntimeException("Zookeeper configuration was not stored");
+                                       }
+                               } catch (KeeperException e) {
+                                       throw new 
RuntimeException(MessageFormat.format("Zookeeper config could not be read, {0} 
Zookeeper exception occured!", e.code().name()), e);
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException("Zookeeper 
config could not be read, the connection was interrupded", e);
+                               } catch (IOException | JSONException e) {
+                                       throw new RuntimeException("Zookeeper 
config could not be read, the file could not be parsed correctly", e);
+                               } finally {
+                                       if (zk != null) {
+                                               try {
+                                                       zk.close();
+                                               } catch (InterruptedException 
e) {
+                                                       e.printStackTrace();
+                                               }
+
+                                       }
+                               }
+
+                       }
+                       try {
+                               return JSONUtils.fromJSON(configCache, 
ConfigContainer.class);
+                       } catch (JSONException e) {
+                               throw new RuntimeException("Cached 
configuration was not valid", e);
+                       }
+               }
+       }
+
+       private void initializeConfiguration(ZooKeeper zk, String config) {
+               try {
+                       if (getZookeeperConfigPath().contains("/")) {
+                               String[] nodes = 
getZookeeperConfigPath().split("/");
+                               StringBuilder path = new StringBuilder();
+                               for (String node : nodes) {
+                                       if (node.trim().equals("")) {
+                                               //ignore empty paths
+                                               continue;
+                                       }
+                                       path.append("/" + node);
+                                       try {
+                                               zk.create(path.toString(), new 
byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                                       } catch (NodeExistsException ex) {
+                                               //ignore if node already exists 
and continue with next node
+                                       }
+                               }
+                       }
+
+                       //use version -1 to ignore versioning conflicts
+                       try {
+                               zk.setData(getZookeeperConfigPath(), 
config.toString().getBytes("UTF-8"), -1);
+                       } catch (UnsupportedEncodingException e) {
+                               // should not happen
+                               throw new RuntimeException(e);
+                       }
+               } catch (KeeperException e) {
+                       throw new RuntimeException(MessageFormat.format("The 
zookeeper config could not be initialized, a Zookeeper exception of type {0} 
occured!", e.code().name()), e);
+               } catch (InterruptedException e) {
+                       throw new RuntimeException("The zookeeper config could 
not be initialized, the connection got interrupted!", e);
+               }
+       }
+
+       private ZooKeeper getZkConnectionSynchronously() {
+               final CountDownLatch latch = new CountDownLatch(1);
+               logger.log(Level.FINE, "Trying to connect to zookeeper at {0}", 
zookeeperString);
+               ZooKeeper zk = null;
+               try {
+                       int timeout = 5;
+                       zk = new ZooKeeper(zookeeperString, timeout * 1000, new 
Watcher() {
+
+                               @Override
+                               public void process(WatchedEvent event) {
+                                       if 
(event.getState().equals(Watcher.Event.KeeperState.ConnectedReadOnly) || 
event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
+                                               //count down latch, connected 
successfully to zk
+                                               latch.countDown();
+                                       }
+                               }
+                       });
+                       //block thread till countdown, maximum of "timeout" 
seconds
+                       latch.await(5 * timeout, TimeUnit.SECONDS);
+                       if (latch.getCount() > 0) {
+                               zk.close();
+                               throw new RuntimeException("The zookeeper 
connection could not be retrieved on time!");
+                       }
+                       return zk;
+               } catch (IOException e1) {
+                       throw new RuntimeException("The zookeeper connection 
could not be retrieved, the connection failed!", e1);
+               } catch (InterruptedException e) {
+                       throw new RuntimeException("Zookeeper connection could 
not be retrieved, the thread was interrupted!", e);
+               }
+       }
+
+       public String getZookeeperConfigPath() {
+               return ZOOKEEPER_CONFIG_PATH;
+       }
+
+       @Override
+       public void onConfigChange(ConfigContainer container) {
+               synchronized (configCacheLock) {
+                       try {
+                               configCache = JSONUtils.toJSON(container);
+                       } catch (JSONException e) {
+                               throw new RuntimeException("Config could not be 
cloned!", e);
+                       }
+               }
+       }
+
+       @Override
+       public void addPendingConfigChange(String changeId) {
+               synchronized (configCacheLock) {
+                       pendingConfigChanges.add(changeId);
+               }
+       }
+
+       @Override
+       public void removePendingConfigChange(String changeId) {
+               synchronized (configCacheLock) {
+                       pendingConfigChanges.remove(changeId);
+               }
+       }
+
+       @Override
+       public boolean isConfigChangePending(String changeId) {
+               synchronized (configCacheLock) {
+                       return pendingConfigChanges.contains(changeId);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties
----------------------------------------------------------------------
diff --git 
a/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties
 
b/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties
new file mode 100755
index 0000000..7234e9c
--- /dev/null
+++ 
b/odf/odf-store/src/main/resources/org/apache/atlas/odf/core/internal/zookeeper/test-embedded-zookeeper.properties
@@ -0,0 +1,34 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/odf-embedded-test-kafka/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a 
non-production config
+maxClientCnxns=0
+

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git 
a/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
 
b/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..65a7b5d
--- /dev/null
+++ 
b/odf/odf-store/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+ODFConfigurationStorage=org.apache.atlas.odf.core.store.zookeeper34.ZookeeperConfigurationStorage

Reply via email to