http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java
 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java
new file mode 100755
index 0000000..e759ecc
--- /dev/null
+++ 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueManager.java
@@ -0,0 +1,488 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.engine.KafkaGroupOffsetInfo;
+import org.apache.atlas.odf.api.engine.KafkaStatus;
+import org.apache.atlas.odf.api.engine.KafkaTopicStatus;
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.api.settings.KafkaMessagingConfiguration;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.atlas.odf.core.notification.NotificationListener;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.engine.MessagingStatus;
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.controlcenter.AdminMessage;
+import org.apache.atlas.odf.core.controlcenter.AdminQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.ConfigChangeQueueProcessor;
+import 
org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore.StatusQueueProcessor;
+import org.apache.atlas.odf.core.controlcenter.DiscoveryServiceStarter;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import 
org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.controlcenter.TrackerUtil;
+import org.apache.atlas.odf.core.notification.NotificationManager;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.common.TopicExistsException;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class KafkaQueueManager implements DiscoveryServiceQueueManager {
+
+       public static final String TOPIC_NAME_STATUS_QUEUE = "odf-status-topic";
+       public static final String TOPIC_NAME_ADMIN_QUEUE = "odf-admin-topic";
+       public static final String ADMIN_QUEUE_KEY = "odf-admin-queue-key";
+       public static final String SERVICE_TOPIC_PREFIX = "odf-topic-";
+
+       public static final RackAwareMode DEFAULT_RACK_AWARE_MODE = 
RackAwareMode.Disabled$.MODULE$;
+       
+       //use static UUID so that no unnecessary consumer threads are started
+       private final static String UNIQUE_SESSION_THREAD_ID = 
UUID.randomUUID().toString();
+
+       private final static int THREAD_STARTUP_TIMEOUT_MS = 5000;
+       
+       private static List<String> queueConsumerNames = null;
+       private static Object startLock = new Object();
+
+       private final static Logger logger = 
Logger.getLogger(KafkaQueueManager.class.getName());
+
+       private ThreadManager threadManager;
+       private SettingsManager odfConfig;
+       private String zookeeperConnectString;
+
+       public KafkaQueueManager() {
+               ODFInternalFactory factory = new ODFInternalFactory();
+               threadManager = factory.create(ThreadManager.class);
+               ExecutorServiceFactory esf = 
factory.create(ExecutorServiceFactory.class);
+               threadManager.setExecutorService(esf.createExecutorService());
+               zookeeperConnectString = 
factory.create(Environment.class).getZookeeperConnectString();
+               odfConfig = factory.create(SettingsManager.class);
+       }
+       
+       
+       public Properties getConsumerConfigProperties(String consumerGroupID, 
boolean consumeFromEnd) {
+               Properties kafkaConsumerProps = 
odfConfig.getKafkaConsumerProperties();
+               kafkaConsumerProps.put("group.id", consumerGroupID);
+               if (zookeeperConnectString != null) {
+                       kafkaConsumerProps.put("zookeeper.connect", 
zookeeperConnectString);
+               }
+               if (consumeFromEnd) {
+                       
kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest");
+               } else {
+                       
kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+               }
+               
kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+               
kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+               kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
getBootstrapServers());
+               
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);        
        
+               return kafkaConsumerProps;
+       }
+
+       private String getBootstrapServers() {
+               final List<String> brokers = new 
ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnectString);
+               StringBuilder servers = new StringBuilder();
+               final Iterator<String> iterator = brokers.iterator();
+               while(iterator.hasNext()){
+                       servers.append(iterator.next());
+                       if(iterator.hasNext()){
+                               servers.append(",");
+                       }
+               }
+               return servers.toString();
+       }
+
+       protected void createTopicIfNotExists(String topicName, int 
partitionCount, Properties props) {
+               String zkHosts = props.getProperty("zookeeper.connect");
+               ZkClient zkClient = null;
+               try {
+                       zkClient = new ZkClient(zkHosts, 
Integer.valueOf(props.getProperty("zookeeperSessionTimeoutMs")),
+                                       
Integer.valueOf(props.getProperty("zookeeperConnectionTimeoutMs")), 
ZKStringSerializer$.MODULE$);
+               } catch (ZkTimeoutException zkte) {
+                       logger.log(Level.SEVERE, "Could not connect to the 
Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", 
zkHosts);
+               }
+               try {
+                       logger.log(Level.FINEST, "Checking if topic ''{0}'' 
already exists", topicName);
+                       // using partition size 1 and replication size 1, no 
special
+                       // per-topic config needed
+                       try {
+                               final ZkUtils zkUtils = new ZkUtils(zkClient, 
new ZkConnection(zkHosts), false);
+                               if (!AdminUtils.topicExists(zkUtils, 
topicName)) {
+                                       logger.log(Level.INFO, "Topic ''{0}'' 
does not exist, creating it", topicName);
+
+                                       //FIXME zkUtils isSecure parameter? 
Only with SSL! --> parse zkhosts?
+                                       KafkaMessagingConfiguration kafkaConfig 
= ((KafkaMessagingConfiguration) 
odfConfig.getODFSettings().getMessagingConfiguration());
+                                       AdminUtils.createTopic(zkUtils, 
topicName, partitionCount, kafkaConfig.getKafkaBrokerTopicReplication(),
+                                                       new Properties(), 
DEFAULT_RACK_AWARE_MODE);
+                                       logger.log(Level.FINE, "Topic ''{0}'' 
created", topicName);
+                                       //wait before continuing to make sure 
the topic exists BEFORE consumers are started
+                                       try {
+                                               Thread.sleep(1500);
+                                       } catch (InterruptedException e) {
+                                               // TODO Auto-generated catch 
block
+                                               e.printStackTrace();
+                                       }
+                               }
+                       } catch (TopicExistsException ex) {
+                               logger.log(Level.FINE, "Topic ''{0}'' already 
exists.", topicName);
+                       }
+               } finally {
+                       if (zkClient != null) {
+                               zkClient.close();
+                       }
+               }
+       }
+
+
+       private String getTopicName(ServiceRuntime runtime) {
+               return "odf-runtime-" + runtime.getName();
+       }
+       
+       private String getConsumerGroup(ServiceRuntime runtime) {
+               return getTopicName(runtime) + "_group";
+       }
+       
+       private List<ThreadStartupResult> scheduleAllRuntimeConsumers() {
+               List<ThreadStartupResult> results = new ArrayList<>();
+               for (ServiceRuntime runtime : 
ServiceRuntimes.getActiveRuntimes()) {
+                       results.addAll(scheduleRuntimeConsumers(runtime));
+               }
+               return results;
+       }
+       
+       private List<ThreadStartupResult> 
scheduleRuntimeConsumers(ServiceRuntime runtime) {
+               logger.log(Level.FINER, "Create consumers on queue for runtime 
''{0}'' if it doesn't already exist", runtime.getName());
+
+               String topicName = getTopicName(runtime);
+               String consumerGroupId = getConsumerGroup(runtime);
+               Properties kafkaConsumerProps = 
getConsumerConfigProperties(consumerGroupId, false); // read entries from 
beginning if consumer was never initialized 
+               String threadName = "RuntimeQueueConsumer" + topicName;
+               List<ThreadStartupResult> result = new 
ArrayList<ThreadStartupResult>();
+               if (threadManager.getStateOfUnmanagedThread(threadName) != 
ThreadStatus.ThreadState.RUNNING) {
+                       createTopicIfNotExists(topicName, 1, 
kafkaConsumerProps);
+                       ThreadStartupResult startupResult = 
threadManager.startUnmanagedThread(threadName, new 
KafkaRuntimeConsumer(runtime, topicName, kafkaConsumerProps, new 
DiscoveryServiceStarter()));
+                       result.add(startupResult);              
+               } else {
+                       result.add(new ThreadStartupResult(threadName) {
+                               @Override
+                               public boolean isNewThreadCreated() {
+                                       return false;
+                               }
+
+                               @Override
+                               public boolean isReady() {
+                                       return true;
+                               }
+                       });
+               }
+               return result;
+       }
+
+       
+       private List<ThreadStartupResult> scheduleConsumerThreads(String 
topicName, int partitionCount, Properties kafkaConsumerProps, String threadName,
+                       List<QueueMessageProcessor> processors) {
+               if (processors.size() != partitionCount) {
+                       final String msg = "The number of processors must be 
equal to the partition count in order to support parallel processing";
+                       logger.warning(msg);
+                       throw new RuntimeException(msg);
+               }
+               createTopicIfNotExists(topicName, partitionCount, 
kafkaConsumerProps);
+
+               List<ThreadStartupResult> result = new 
ArrayList<ThreadStartupResult>();
+               for (int no = 0; no < partitionCount; no++) {
+                       if (threadManager.getStateOfUnmanagedThread(threadName 
+ "_" + no) != ThreadStatus.ThreadState.RUNNING) {
+                               QueueMessageProcessor processor = 
processors.get(no);
+                               ThreadStartupResult created = 
threadManager.startUnmanagedThread(threadName + "_" + no, new 
KafkaQueueConsumer(topicName, kafkaConsumerProps, processor));
+                               if (created.isNewThreadCreated()) {
+                                       logger.log(Level.INFO, "Created new 
consumer thread on topic ''{0}'' with group ID ''{1}'', thread name: ''{2}'', 
properties: ''{3}''",
+                                                       new Object[] { 
topicName, kafkaConsumerProps.getProperty("group.id"), threadName + "_" + no, 
kafkaConsumerProps.toString() });
+                               } else {
+                                       logger.log(Level.FINE, "Consumer thread 
with thread name: ''{0}'' already exists, doing nothing", new Object[] { 
threadName + "_" + no });
+                               }
+                               result.add(created);
+                       } else {
+                               result.add(new ThreadStartupResult(threadName) {
+                                       @Override
+                                       public boolean isNewThreadCreated() {
+                                               return false;
+                                       }
+
+                                       @Override
+                                       public boolean isReady() {
+                                               return true;
+                                       }
+                               });
+                       }
+               }
+               return result;
+       }
+
+       private ThreadStartupResult scheduleConsumerThread(String topicName, 
Properties kafkaConsumerProps, String threadName, QueueMessageProcessor 
processor) {
+               return scheduleConsumerThreads(topicName, 1, 
kafkaConsumerProps, threadName, Arrays.asList(processor)).get(0);
+       }
+
+       @Override
+       public void enqueue(AnalysisRequestTracker tracker) {
+               DiscoveryServiceRequest dsRequest = 
TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+               if (dsRequest == null) {
+                       throw new RuntimeException("Tracker is finished, should 
not be enqueued");
+               }
+               String dsID = dsRequest.getDiscoveryServiceId();
+               dsRequest.setPutOnRequestQueue(System.currentTimeMillis());
+               ServiceRuntime runtime = 
ServiceRuntimes.getRuntimeForDiscoveryService(dsID);
+               if (runtime == null) {
+                       throw new 
RuntimeException(MessageFormat.format("Service runtime for service ''{0}'' was 
not found.", dsID));
+               }
+               enqueueJSONMessage(getTopicName(runtime), tracker, 
tracker.getRequest().getId());
+       }
+
+       private void enqueueJSONMessage(String topicName, Object jsonObject, 
String key) {
+               String value = null;
+               try {
+                       value = JSONUtils.toJSON(jsonObject);
+               } catch (JSONException e) {
+                       throw new RuntimeException(e);
+               }
+               new 
ODFInternalFactory().create(KafkaProducerManager.class).sendMsg(topicName, key, 
value);
+       }
+
+       List<ThreadStartupResult> scheduleStatusQueueConsumers() {
+               logger.log(Level.FINER, "Create consumers on status queue if 
they don't already exist");
+               List<ThreadStartupResult> results = new 
ArrayList<ThreadStartupResult>();
+
+               // create consumer thread for the status watcher of all trackes
+               String statusWatcherConsumerGroupID = 
"DSStatusWatcherConsumerGroup" + UNIQUE_SESSION_THREAD_ID; // have a new group 
id on each node that reads all from the beginning
+               // always read from beginning for the status queue
+               Properties statusWatcherKafkaConsumerProps = 
getConsumerConfigProperties(statusWatcherConsumerGroupID, false);
+               final String statusWatcherThreadName = "StatusWatcher" + 
TOPIC_NAME_STATUS_QUEUE; // a fixed name
+               String threadNameWithPartition = statusWatcherThreadName + "_0";
+               final ThreadStatus.ThreadState stateOfUnmanagedThread = 
threadManager.getStateOfUnmanagedThread(threadNameWithPartition);
+               logger.fine("State of status watcher thread: " + 
stateOfUnmanagedThread);
+               if (stateOfUnmanagedThread != ThreadStatus.ThreadState.RUNNING) 
{
+                       final ThreadStartupResult scheduleConsumerThread = 
scheduleConsumerThread(TOPIC_NAME_STATUS_QUEUE, 
statusWatcherKafkaConsumerProps, statusWatcherThreadName,
+                                       new StatusQueueProcessor());
+                       results.add(scheduleConsumerThread);
+               } else {
+                       results.add(new 
ThreadStartupResult(statusWatcherThreadName) {
+                               @Override
+                               public boolean isNewThreadCreated() {
+                                       return false;
+                               }
+
+                               @Override
+                               public boolean isReady() {
+                                       return true;
+                               }
+                       });
+               }
+
+               return results;
+       }
+
+
+       @Override
+       public void enqueueInStatusQueue(StatusQueueEntry sqe) {
+               enqueueJSONMessage(TOPIC_NAME_STATUS_QUEUE, sqe, 
StatusQueueEntry.getRequestId(sqe));
+       }
+
+
+       private List<ThreadStartupResult> scheduleAdminQueueConsumers() {
+               List<ThreadStartupResult> results = new 
ArrayList<ThreadStartupResult>();
+               //schedule admin queue consumers
+               // consumer group so that every node receives events
+               String adminWatcherConsumerGroupID = 
"DSAdminQueueConsumerGroup" + UNIQUE_SESSION_THREAD_ID; // have a new group id 
on each node 
+               Properties adminWatcherKafkaConsumerProps = 
getConsumerConfigProperties(adminWatcherConsumerGroupID, true);
+               final String adminWatcherThreadName = "AdminWatcher" + 
TOPIC_NAME_ADMIN_QUEUE;
+               String threadNameWithPartition = adminWatcherThreadName + "_0";
+               if 
(threadManager.getStateOfUnmanagedThread(threadNameWithPartition) != 
ThreadStatus.ThreadState.RUNNING) {
+                       
results.add(scheduleConsumerThread(TOPIC_NAME_ADMIN_QUEUE, 
adminWatcherKafkaConsumerProps, adminWatcherThreadName, new 
AdminQueueProcessor()));
+                       // consumer group so only one node receives events
+                       String distributedAdminConsumerGroup = 
"DSAdminQueueConsumerGroupCommon";
+                       Properties kafkaProps = 
getConsumerConfigProperties(distributedAdminConsumerGroup, true);
+                       final String threadName = "DistributedAdminWatcher";
+                       
results.add(scheduleConsumerThread(TOPIC_NAME_ADMIN_QUEUE, kafkaProps, 
threadName, new ConfigChangeQueueProcessor()));
+               } else {
+                       results.add(new 
ThreadStartupResult(adminWatcherThreadName) {
+                               @Override
+                               public boolean isNewThreadCreated() {
+                                       return false;
+                               }
+
+                               @Override
+                               public boolean isReady() {
+                                       return true;
+                               }
+                       });
+               }
+               return results;
+       }
+
+       @Override
+       public void enqueueInAdminQueue(AdminMessage message) {
+               enqueueJSONMessage(TOPIC_NAME_ADMIN_QUEUE, message, 
ADMIN_QUEUE_KEY);
+       }
+
+       @Override
+       public void start() throws TimeoutException {
+               synchronized (startLock) {
+                       if (queueConsumerNames == null) {
+                               List<ThreadStartupResult> results = new 
ArrayList<>();
+                               results.addAll(scheduleStatusQueueConsumers());
+                               results.addAll(scheduleAdminQueueConsumers());
+                               results.addAll(scheduleAllRuntimeConsumers());
+                               
results.addAll(scheduleNotificationListenerThreads());
+                               List<String> consumerNames = new ArrayList<>();
+                               for (ThreadStartupResult tsr : results) {
+                                       consumerNames.add(tsr.getThreadId());
+                               }
+                               queueConsumerNames = consumerNames;
+                               
this.threadManager.waitForThreadsToBeReady(THREAD_STARTUP_TIMEOUT_MS * 
results.size(), results);
+                               logger.info("KafkaQueueManager successfully 
initialized");
+                       }
+               }
+       }
+       
+       public void stop() {
+               synchronized (startLock) {
+                       if (queueConsumerNames != null) {
+                               
threadManager.shutdownThreads(queueConsumerNames);
+                               queueConsumerNames = null;
+                       }
+               }
+       }
+
+       @Override
+       public MessagingStatus getMessagingStatus() {
+               KafkaStatus status = new KafkaStatus();
+               KafkaMonitor monitor = new 
ODFInternalFactory().create(KafkaMonitor.class);
+               status.setBrokers(monitor.getBrokers(zookeeperConnectString));
+
+               List<String> topics = new 
ArrayList<String>(Arrays.asList(KafkaQueueManager.TOPIC_NAME_ADMIN_QUEUE, 
KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE));
+               for (DiscoveryServiceProperties info : new 
ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties())
 {
+                       topics.add(KafkaQueueManager.SERVICE_TOPIC_PREFIX + 
info.getId());
+               }
+
+               List<KafkaTopicStatus> topicStatusList = new 
ArrayList<KafkaTopicStatus>();
+               for (String topic : topics) {
+                       KafkaTopicStatus topicStatus = getTopicStatus(topic, 
monitor);
+                       topicStatusList.add(topicStatus);
+               }
+               status.setTopicStatus(topicStatusList);
+               return status;
+       }
+
+       private KafkaTopicStatus getTopicStatus(String topic, KafkaMonitor 
monitor) {
+               KafkaTopicStatus topicStatus = new KafkaTopicStatus();
+               topicStatus.setTopic(topic);
+               
topicStatus.setBrokerPartitionMessageInfo(monitor.getMessageCountForTopic(zookeeperConnectString,
 topic));
+
+               List<KafkaGroupOffsetInfo> offsetInfoList = new 
ArrayList<KafkaGroupOffsetInfo>();
+               List<String> consumerGroupsFromZookeeper = 
monitor.getConsumerGroups(zookeeperConnectString, topic);
+               for (String group : consumerGroupsFromZookeeper) {
+                       KafkaGroupOffsetInfo offsetInfoContainer = new 
KafkaGroupOffsetInfo();
+                       offsetInfoContainer.setGroupId(group);
+                       List<PartitionOffsetInfo> offsetsForTopic = 
monitor.getOffsetsForTopic(zookeeperConnectString, group, topic);
+                       for (PartitionOffsetInfo info : offsetsForTopic) {
+                               // to reduce clutter, only if at least 1 
partition has an offset > -1 (== any offset) for this consumer group, 
+                               // it will be included in the result
+                               if (info.getOffset() > -1) {
+                                       
offsetInfoContainer.setOffsets(offsetsForTopic);
+                                       offsetInfoList.add(offsetInfoContainer);
+                                       break;
+                               }
+                       }
+               }
+               topicStatus.setConsumerGroupOffsetInfo(offsetInfoList);
+
+               
topicStatus.setPartitionBrokersInfo(monitor.getPartitionInfoForTopic(zookeeperConnectString,
 topic));
+               return topicStatus;
+       }
+
+       private List<ThreadStartupResult> scheduleNotificationListenerThreads() 
{
+               NotificationManager nm = new 
ODFInternalFactory().create(NotificationManager.class);
+               List<NotificationListener> listeners = nm.getListeners();
+               List<ThreadStartupResult> result = new ArrayList<>();
+               if (listeners == null) {
+                       return result;
+               }
+               final OpenDiscoveryFramework odf = new ODFFactory().create();
+               for (final NotificationListener listener : listeners) {
+                       String topicName = listener.getTopicName();
+                       String consumerGroupId = "ODFNotificationGroup" + 
topicName;
+                       Properties kafkaConsumerProps = 
getConsumerConfigProperties(consumerGroupId, true);  
+                       String threadName = "NotificationListenerThread" + 
topicName;
+                       if (threadManager.getStateOfUnmanagedThread(threadName) 
!= ThreadStatus.ThreadState.RUNNING) {
+                               KafkaQueueConsumer consumer = new 
KafkaQueueConsumer(topicName, kafkaConsumerProps, new QueueMessageProcessor() {
+                                       
+                                       @Override
+                                       public void process(ExecutorService 
executorService, String msg, int partition, long msgOffset) {
+                                               try {
+                                                       listener.onEvent(msg, 
odf);
+                                               } catch(Exception exc) {
+                                                       String errorMsg = 
MessageFormat.format("Notification listsner ''{0}'' has thrown an exception. 
Ignoring it", listener.getName());
+                                                       
logger.log(Level.WARNING, errorMsg, exc);
+                                               }
+                                       }
+                               });
+                               ThreadStartupResult startupResult = 
threadManager.startUnmanagedThread(threadName, consumer);
+                               result.add(startupResult);              
+                       } else {
+                               result.add(new ThreadStartupResult(threadName) {
+                                       @Override
+                                       public boolean isNewThreadCreated() {
+                                               return false;
+                                       }
+
+                                       @Override
+                                       public boolean isReady() {
+                                               return true;
+                                       }
+                               });
+                       }
+               }
+               return result;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java
 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java
new file mode 100755
index 0000000..73d98e7
--- /dev/null
+++ 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaRuntimeConsumer.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ServiceRuntime;
+
+/**
+ * This consumer is started for a certain runtime and starts a 
KafkaQueueConsumer if
+ * the runtime is available. 
+ * 
+ *
+ */
+public class KafkaRuntimeConsumer implements ODFRunnable {
+
+       Logger logger = Logger.getLogger(KafkaRuntimeConsumer.class.getName());
+
+       private ServiceRuntime runtime;
+       private boolean isShutdown = false;
+       private ExecutorService executorService = null;
+       private KafkaQueueConsumer kafkaQueueConsumer = null;
+
+       private String topic;
+       private Properties config;
+       private QueueMessageProcessor processor;
+
+       private KafkaQueueConsumer.ConsumptionCallback callback = new 
KafkaQueueConsumer.ConsumptionCallback() {
+               @Override
+               public boolean stopConsumption() {
+                       return isShutdown || 
(runtime.getWaitTimeUntilAvailable() > 0);
+               }
+       };
+
+       public KafkaRuntimeConsumer(ServiceRuntime runtime, String topicName, 
Properties config, QueueMessageProcessor processor) {
+               this.runtime = runtime;
+               this.processor = processor;
+               this.topic = topicName;
+               this.config = config;
+       }
+
+       @Override
+       public void run() {
+               logger.log(Level.INFO, "Starting runtime consumer for topic 
''{0}''", topic);
+               while (!isShutdown) {
+                       long waitTime = runtime.getWaitTimeUntilAvailable();
+                       if (waitTime <= 0) {
+                               logger.log(Level.INFO, "Starting Kafka consumer 
for topic ''{0}''", topic);
+                               kafkaQueueConsumer = new 
KafkaQueueConsumer(topic, config, processor, callback);
+                               
kafkaQueueConsumer.setExecutorService(executorService);
+                               // run consumer synchronously
+                               kafkaQueueConsumer.run();
+                               logger.log(Level.INFO, "Kafka consumer for 
topic ''{0}'' is finished", topic);
+
+                               // if we are here, this means that the consumer 
was cancelled
+                               // either directly or (more likely) through the 
Consumption callback 
+                               kafkaQueueConsumer = null;
+                       } else {
+                               try {
+                                       logger.log(Level.FINER, "Runtime 
''{0}'' is not available, waiting for ''{1}''ms", new 
Object[]{runtime.getName(), waitTime });
+                                       Thread.sleep(waitTime);
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }
+               }
+               logger.log(Level.INFO, "Kafka runtime consumer for topic 
''{0}'' has shut down", topic);
+       }
+
+       @Override
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
+
+       @Override
+       public void cancel() {
+               isShutdown = true;
+               if (kafkaQueueConsumer != null) {
+                       kafkaQueueConsumer.cancel();
+               }
+       }
+
+       @Override
+       public boolean isReady() {
+               return true;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java
 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java
new file mode 100755
index 0000000..9c08f3a
--- /dev/null
+++ 
b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/MessageSearchConsumer.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+
+public class MessageSearchConsumer implements ODFRunnable {
+       private static final long POLLING_DURATION_MS = 100;
+       private static final int MAX_POLL_COUNT = 5;
+
+       private Logger logger = 
Logger.getLogger(MessageSearchConsumer.class.getName());
+       private SearchCompletedCallback searchCompletedCallback;
+       private List<String> searchStrings;
+       protected String topic;
+       private KafkaConsumer<String, String> kafkaConsumer;
+       private boolean shutdown;
+       private boolean ready = false;
+       private List<PartitionOffsetInfo> maxOffsetsForTopic = new 
ArrayList<PartitionOffsetInfo>();
+
+
+       public MessageSearchConsumer(String topic, SearchCompletedCallback 
completitionCallback, List<String> searchStrings) {
+               setTopic(topic);
+               setSearchStrings(searchStrings);
+               setCompletitionCallback(completitionCallback);
+       }
+
+       public MessageSearchConsumer() {
+       }
+
+       protected List<PartitionOffsetInfo> retrieveTopicOffsets() {
+               List<PartitionOffsetInfo> offsetsForTopic = new 
ArrayList<PartitionOffsetInfo>();
+               String zookeeperConnect = new 
ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+
+               if (zookeeperConnect != null) {
+                       final KafkaMonitor create = new 
ODFInternalFactory().create(KafkaMonitor.class);
+                       for (int part : 
create.getPartitionsForTopic(zookeeperConnect, this.topic)) {
+                               
offsetsForTopic.add(create.getOffsetsOfLastMessagesForTopic(zookeeperConnect, 
this.topic, part));
+                       }
+               }
+               return offsetsForTopic;
+       }
+
+       public void setTopic(String topic) {
+               this.topic = topic;
+       }
+
+       public void setSearchStrings(List<String> searchStrings) {
+               this.searchStrings = searchStrings;
+       }
+
+       public void setCompletitionCallback(SearchCompletedCallback 
completitionCallback) {
+               this.searchCompletedCallback = completitionCallback;
+       }
+
+       protected Properties getKafkaConsumerProperties() {
+               Properties consumerProperties = new 
ODFFactory().create().getSettingsManager().getKafkaConsumerProperties();
+               consumerProperties.put("group.id", UUID.randomUUID().toString() 
+ "_searchConsumer");
+               final String zookeeperConnect = new 
ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+               consumerProperties.put("zookeeper.connect", zookeeperConnect);
+               
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+               
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+               
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+               final Iterator<String> brokers = new 
ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnect).iterator();
+               StringBuilder brokersString = new StringBuilder();
+               while (brokers.hasNext()) {
+                       brokersString.append(brokers.next());
+                       if (brokers.hasNext()) {
+                               brokersString.append(",");
+                       }
+               }
+               consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokersString.toString());
+               
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+               return consumerProperties;
+       }
+
+       @Override
+       public void run() {
+               this.maxOffsetsForTopic = retrieveTopicOffsets();
+               final String logPrefix = "Consumer for topic " + topic + ": ";
+               try {
+
+                       Map<Integer, Boolean> maxOffsetReachedMap = new 
HashMap<Integer, Boolean>();
+                       if (maxOffsetsForTopic.isEmpty()) {
+                               logger.info("No offsets found for topic " + 
this.topic + ", therefore no matching messages can be found");
+                               if (searchCompletedCallback != null) {
+                                       
searchCompletedCallback.onDoneSearching(new HashMap<String, 
PartitionOffsetInfo>());
+                                       return;
+                               }
+                       }
+                       for (PartitionOffsetInfo info : maxOffsetsForTopic) {
+                               //if the max offset is -1, no message exists on 
the partition
+                               if (info.getOffset() > -1) {
+                                       
maxOffsetReachedMap.put(info.getPartitionId(), false);
+                               }
+                       }
+
+                       Map<String, PartitionOffsetInfo> resultMap = new 
HashMap<String, PartitionOffsetInfo>();
+
+                       Properties consumerProperties = 
getKafkaConsumerProperties();
+
+                       if (this.kafkaConsumer == null) {
+                               logger.fine(logPrefix + " create new consumer 
for topic " + topic);
+                               try {
+                                       this.kafkaConsumer = new 
KafkaConsumer<String, String>(consumerProperties);
+                                       //In order to prevent other consumers 
from getting assigned this partition during a rebalance, the partition(s) MUST 
be assigned manually (not using auto assign because of subscribe())
+                                       
kafkaConsumer.subscribe(Arrays.asList(topic));
+                               } catch (ZkTimeoutException zkte) {
+                                       String zkHosts = 
consumerProperties.getProperty("zookeeper.connect");
+                                       logger.log(Level.SEVERE, logPrefix + " 
Could not connect to the Zookeeper instance at ''{0}''. Please ensure that 
Zookeeper is running", zkHosts);
+                                       throw zkte;
+                               }
+                       }
+                       logger.log(Level.INFO, logPrefix + " Consumer " + 
"''{1}'' is now listening on ODF queue ''{0}'' with configuration {2}",
+                                       new Object[] { topic, kafkaConsumer, 
consumerProperties });
+
+                       int pollCount = 0;
+                       while (!Thread.interrupted() && pollCount < 
MAX_POLL_COUNT && !shutdown && kafkaConsumer != null) {
+                               logger.info("searching ...");
+                               pollCount++;
+                               ConsumerRecords<String, String> records = 
kafkaConsumer.poll(POLLING_DURATION_MS);
+                               ready = true;
+                               final Iterator<ConsumerRecord<String, String>> 
polledRecords = records.records(topic).iterator();
+                               
+                               while (polledRecords.hasNext() && !shutdown) {
+                                       final ConsumerRecord<String, String> 
next = polledRecords.next();
+                                       for (String s : searchStrings) {
+                                               if ((next.key() != null && 
next.key().equals(s)) || (next.value() != null && next.value().contains(s))) {
+                                                       final 
PartitionOffsetInfo position = new PartitionOffsetInfo();
+                                                       
position.setOffset(next.offset());
+                                                       
position.setPartitionId(next.partition());
+                                                       resultMap.put(s, 
position);
+                                               }
+                                       }
+
+                                       if (next.offset() == 
maxOffsetsForTopic.get(next.partition()).getOffset()) {
+                                               
maxOffsetReachedMap.put(next.partition(), true);
+                                       }
+
+                                       boolean allCompleted = true;
+                                       for (Entry<Integer, Boolean> entry : 
maxOffsetReachedMap.entrySet()) {
+                                               if (!entry.getValue()) {
+                                                       allCompleted = false;
+                                                       break;
+                                               }
+                                       }
+
+                                       if (allCompleted) {
+                                               logger.info("Done searching all 
messages");
+                                               if (searchCompletedCallback != 
null) {
+                                                       
searchCompletedCallback.onDoneSearching(resultMap);
+                                                       return;
+                                               }
+                                               shutdown = true;
+                                       }
+                               }
+                       }
+               } catch (Exception exc) {
+                       String msg = MessageFormat.format(" Caught exception on 
queue ''{0}''", topic);
+                       logger.log(Level.WARNING, logPrefix + msg, exc);
+               } finally {
+                       if (kafkaConsumer != null) {
+                               logger.log(Level.FINE, logPrefix + "Closing 
consumer " + " on topic ''{0}''", topic);
+                               kafkaConsumer.close();
+                               logger.log(Level.FINE, logPrefix + "Closed 
consumer " + " on topic ''{0}''", topic);
+                               kafkaConsumer = null;
+                       }
+               }
+               logger.log(Level.FINE, logPrefix + "Finished consumer on topic 
''{0}''", topic);
+       }
+
+       @Override
+       public void setExecutorService(ExecutorService service) {
+
+       }
+
+       @Override
+       public void cancel() {
+               this.shutdown = true;
+       }
+
+       @Override
+       public boolean isReady() {
+               return ready;
+       }
+
+       public interface SearchCompletedCallback {
+               void onDoneSearching(Map<String, PartitionOffsetInfo> 
msgPositionMap);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
 
b/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
new file mode 100755
index 0000000..95c1f71
--- /dev/null
+++ 
b/odf/odf-messaging/src/main/resources/org/apache/atlas/odf/odf-implementation.properties
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+DiscoveryServiceQueueManager=org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java
 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java
new file mode 100755
index 0000000..396193f
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueConsumerExceptionTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.engine.ThreadStatus.ThreadState;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class KafkaQueueConsumerExceptionTest extends ODFTestcase {
+       static Logger logger = ODFTestLogger.get();
+       static final String topicName = "my_dummy_test_topic";
+       static String zookeeperHost = new 
ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+
+       @BeforeClass
+       public static void setupTopic() {
+               ZkClient zkClient = null;
+               try {
+                       zkClient = new ZkClient(zookeeperHost, 5000, 5000, 
ZKStringSerializer$.MODULE$);
+                       logger.log(Level.FINEST, "Checking if topic ''{0}'' 
already exists", topicName);
+                       // using partition size 1 and replication size 1, no 
special
+                       // per-topic config needed
+                       logger.log(Level.FINE, "Topic ''{0}'' does not exist, 
creating it", topicName);
+                       //FIXME zkUtils isSecure parameter? Only with SSL! --> 
parse zkhosts?
+                       AdminUtils.createTopic(new ZkUtils(zkClient, new 
ZkConnection(zookeeperHost), false), topicName, 1, 1, new Properties(), 
KafkaQueueManager.DEFAULT_RACK_AWARE_MODE);
+                       logger.log(Level.FINE, "Topic ''{0}'' created", 
topicName);
+               } catch (TopicExistsException ex) {
+                       logger.log(Level.FINE, "Topic ''{0}'' already exists.", 
topicName);
+               } catch (ZkTimeoutException zkte) {
+                       logger.log(Level.SEVERE, "Could not connect to the 
Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", 
zookeeperHost);
+               } finally {
+                       if (zkClient != null) {
+                               zkClient.close();
+                       }
+               }
+       }
+
+       @Test
+       public void testExceptionAndRetryDuringProcessing() throws 
InterruptedException, ExecutionException, TimeoutException {
+               final ODFInternalFactory odfFactory = new ODFInternalFactory();
+               final String groupId = "retrying-exception-dummy-consumer";
+               Properties kafkaConsumerProperties = new 
KafkaQueueManager().getConsumerConfigProperties(groupId, true);
+               kafkaConsumerProperties.put("group.id", groupId);
+               final List<String> consumedMsgs1 = new ArrayList<String>();
+               KafkaQueueConsumer cnsmr = new KafkaQueueConsumer(topicName, 
kafkaConsumerProperties, new QueueMessageProcessor() {
+
+                       @Override
+                       public void process(ExecutorService executorService, 
String msg, int partition, long offset) {
+                               consumedMsgs1.add(msg);
+                               logger.info("retry_consumer process " + msg + " 
throw exception and try again");
+                               throw new RuntimeException("Oops!");
+                       }
+               });
+
+               final ThreadManager threadManager = 
odfFactory.create(ThreadManager.class);
+               final String consumerThread = "TEST_CONSUMER_RETRY_RUNNING";
+               threadManager.waitForThreadsToBeReady(10000, 
Arrays.asList(threadManager.startUnmanagedThread(consumerThread, cnsmr)));
+
+               sendMsg("TEST_MSG");
+               sendMsg("TEST_MSG2");
+
+               Thread.sleep(2000);
+
+               Assert.assertEquals(2 * 
KafkaQueueConsumer.MAX_PROCESSING_EXCEPTIONS, consumedMsgs1.size());
+
+               final ThreadState stateOfUnmanagedThread = 
threadManager.getStateOfUnmanagedThread(consumerThread);
+               Assert.assertEquals(ThreadState.RUNNING, 
stateOfUnmanagedThread);
+       }
+
+       void sendMsg(String msg) throws InterruptedException, 
ExecutionException, TimeoutException {
+               SettingsManager odfConfig = new 
ODFFactory().create().getSettingsManager();
+
+               Properties props = odfConfig.getKafkaProducerProperties();
+               final Iterator<String> brokers = new 
ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+               StringBuilder brokersString = new StringBuilder();
+               while (brokers.hasNext()) {
+                       brokersString.append(brokers.next());
+                       if (brokers.hasNext()) {
+                               brokersString.append(",");
+                       }
+               }
+               props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokersString.toString());
+
+               final KafkaProducer<String, String> producer = new 
KafkaProducer<String, String>(props);
+               ProducerRecord<String, String> producerRecord = new 
ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg);
+               producer.send(producerRecord).get(3000, TimeUnit.MILLISECONDS);
+               producer.close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java
 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java
new file mode 100755
index 0000000..cff538c
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/KafkaQueueManagerTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.settings.MessagingConfiguration;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.wink.json4j.JSONException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.engine.ThreadStatus.ThreadState;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultThreadManager;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry;
+import 
org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.controlcenter.TrackerUtil;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaProducerManager;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class KafkaQueueManagerTest extends ODFTestBase {
+
+       private static Long origRetention;
+       Logger logger = ODFTestLogger.get();
+       String zookeeperConnectString = new 
ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+
+       @BeforeClass
+       public static void setupTrackerRetention() throws ValidationException {
+               SettingsManager settingsManager = new 
ODFFactory().create().getSettingsManager();
+               //SETUP RETENTION TO KEEP TRACKERS!!!
+               final MessagingConfiguration messagingConfiguration = 
settingsManager.getODFSettings().getMessagingConfiguration();
+               origRetention = 
messagingConfiguration.getAnalysisRequestRetentionMs();
+               
messagingConfiguration.setAnalysisRequestRetentionMs(120000000l);
+
+               ODFTestLogger.get().info("Set request retention to " + 
settingsManager.getODFSettings().getMessagingConfiguration().getAnalysisRequestRetentionMs());
+       }
+
+       @AfterClass
+       public static void cleanupTrackerRetention() throws ValidationException 
{
+               SettingsManager settingsManager = new 
ODFFactory().create().getSettingsManager();
+               ODFSettings settings = settingsManager.getODFSettings();
+               
settings.getMessagingConfiguration().setAnalysisRequestRetentionMs(origRetention);
+               settingsManager.updateODFSettings(settings);
+       }
+
+       @Test
+       public void testStatusQueue() throws Exception {
+               KafkaQueueManager kqm = new KafkaQueueManager();
+
+               logger.info("Queue manager created");
+               AnalysisRequestTracker tracker = 
JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, 
"org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+
+               long before = System.currentTimeMillis();
+               tracker.setLastModified(before);
+               int maxEntries = 10;
+               for (int i = 0; i < maxEntries; i++) {
+                       tracker.getRequest().setId("id" + i);
+                       StatusQueueEntry sqe = new StatusQueueEntry();
+                       sqe.setAnalysisRequestTracker(tracker);
+                       kqm.enqueueInStatusQueue(sqe);
+
+                       //                      System.out.println("tracker 
"+i+" enqueued in status queue");
+               }
+               long after = System.currentTimeMillis();
+               logger.info("Time for enqueueing " + maxEntries + " objects: " 
+ (after - before) + ", " + ((after - before) / maxEntries) + "ms per object");
+               Thread.sleep(100 * maxEntries);
+
+               AnalysisRequestTrackerStore store = new 
DefaultStatusQueueStore();
+
+               for (int i = 0; i < maxEntries; i++) {
+                       logger.info("Querying status " + i);
+                       AnalysisRequestTracker queriedTracker = 
store.query("id" + i);
+                       Assert.assertNotNull(queriedTracker);
+                       Assert.assertEquals(STATUS.FINISHED, 
queriedTracker.getStatus());
+               }
+
+               //      Thread.sleep(5000);
+               //      Assert.fail("you fail");
+               logger.info("Test testEnqueueStatusQueue finished");
+       }
+
+       /**
+        * This test creates a tracker, puts it on the status queue, kills the 
service consumer and creates a new dummy consumer to put the offset of the 
service consumer behind the new tracker
+        * Then the status consumer is shut down and its offset is reset in 
order to make it consume from the start again and thereby cleaning up stuck 
processes
+        * Then kafka queue manager is re-initialized, causing all consumers to 
come up and triggering the cleanup process
+        */
+       @Test
+       @Ignore("Adjust once ServiceRuntimes are fully implemented")
+       public void testStuckRequestCleanup() throws JSONException, 
InterruptedException, ExecutionException, TimeoutException {
+               final AnalysisRequestTracker tracker = 
JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, 
"org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json",
+                               null);
+               tracker.setStatus(STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+               tracker.setNextDiscoveryServiceRequest(0);
+               tracker.setLastModified(System.currentTimeMillis());
+               final String newTrackerId = "KAFKA_QUEUE_MANAGER_09_TEST" + 
UUID.randomUUID().toString();
+               tracker.getRequest().setId(newTrackerId);
+               DiscoveryServiceRequest dsRequest = 
TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+               final DiscoveryServiceProperties 
discoveryServiceRegistrationInfo = new 
ODFFactory().create().getDiscoveryServiceManager().getDiscoveryServicesProperties()
+                               .get(0);
+               
dsRequest.setDiscoveryServiceId(discoveryServiceRegistrationInfo.getId());
+               String dsID = dsRequest.getDiscoveryServiceId();
+               String topicName = KafkaQueueManager.SERVICE_TOPIC_PREFIX + 
dsID;
+               //Add tracker to queue, set offset behind request so that it 
should be cleanup
+
+               String consumerGroupId = "odf-topic-" + dsID + "_group";
+               String threadName = "Dummy_DiscoveryServiceQueueConsumer" + 
topicName;
+
+               final List<Throwable> multiThreadErrors = new 
ArrayList<Throwable>();
+               final DefaultThreadManager tm = new DefaultThreadManager();
+               logger.info("shutdown old test 09 consumer and replace with 
fake doing nothing");
+               for (int no = 0; no < 
discoveryServiceRegistrationInfo.getParallelismCount(); no++) {
+                       
tm.shutdownThreads(Collections.singletonList("DiscoveryServiceQueueConsumer" + 
topicName + "_" + no));
+               }
+               Properties kafkaConsumerProps = 
getKafkaConsumerConfigProperties(consumerGroupId);
+
+               final long[] producedMsgOffset = new long[1];
+
+               final CountDownLatch msgProcessingLatch = new CountDownLatch(1);
+               ThreadStartupResult created = 
tm.startUnmanagedThread(threadName, new KafkaQueueConsumer(topicName, 
kafkaConsumerProps, new QueueMessageProcessor() {
+
+                       @Override
+                       public void process(ExecutorService executorService, 
String msg, int partition, long msgOffset) {
+                               logger.info("Dequeue without processing " + 
msgOffset);
+                               if (msgOffset == producedMsgOffset[0]) {
+                                       try {
+                                               msgProcessingLatch.countDown();
+                                       } catch (Exception e) {
+                                               msgProcessingLatch.countDown();
+                                               multiThreadErrors.add(e);
+                                       }
+                               }
+                       }
+
+               }));
+
+               tm.waitForThreadsToBeReady(30000, Arrays.asList(created));
+
+               String key = tracker.getRequest().getId();
+               String value = JSONUtils.toJSON(tracker);
+
+               new DefaultStatusQueueStore().store(tracker);
+
+               KafkaMonitor kafkaMonitor = new 
ODFInternalFactory().create(KafkaMonitor.class);
+               List<String> origQueueConsumers = 
kafkaMonitor.getConsumerGroups(zookeeperConnectString, 
KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
+               logger.info("Found status consumers: " + 
origQueueConsumers.toString() + ", shutting down StatusWatcher");
+
+               //kill status queue watcher so that it is restarted when queue 
manager is initialized and detects stuck requests
+               tm.shutdownThreads(Collections.singletonList("StatusWatcher" + 
KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0"));
+
+               int maxWaitForConsumerDeath = 60;
+               while (tm.getStateOfUnmanagedThread("StatusWatcher" + 
KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0") != ThreadState.NON_EXISTENT
+                               || tm.getStateOfUnmanagedThread("StatusWatcher" 
+ KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0") != ThreadState.FINISHED && 
maxWaitForConsumerDeath > 0) {
+                       maxWaitForConsumerDeath--;
+                       Thread.sleep(500);
+               }
+
+               logger.info("Only 1 consumer left? " + maxWaitForConsumerDeath 
+ " : " + tm.getStateOfUnmanagedThread("StatusWatcher" + 
KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE + "_0"));
+               logger.info(" set offset for status consumer to beginning so 
that it consumes from when restarting");
+               final int offset = 1000000;
+               for (String statusConsumerGroup : origQueueConsumers) {
+                       if 
(statusConsumerGroup.contains("DSStatusWatcherConsumerGroup")) {
+                               boolean success = false;
+                               int retryCount = 0;
+                               final int maxOffsetRetry = 20;
+                               while (!success && retryCount < maxOffsetRetry) 
{
+                                       success = 
kafkaMonitor.setOffset(zookeeperConnectString, statusConsumerGroup, 
KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE, 0, offset);
+                                       retryCount++;
+                                       Thread.sleep(500);
+                               }
+
+                               Assert.assertNotEquals(retryCount, 
maxOffsetRetry);
+                               Assert.assertTrue(success);
+                       }
+               }
+
+               new 
ODFInternalFactory().create(KafkaProducerManager.class).sendMsg(topicName, key, 
value, new Callback() {
+
+                       @Override
+                       public void onCompletion(RecordMetadata metadata, 
Exception exception) {
+                               producedMsgOffset[0] = metadata.offset();
+                       }
+               });
+
+               final boolean await = msgProcessingLatch.await(240, 
TimeUnit.SECONDS);
+               Assert.assertTrue(await);
+               if (await) {
+                       logger.info("run after message consumption...");
+                       AnalysisRequestTrackerStore store = new 
ODFInternalFactory().create(AnalysisRequestTrackerStore.class);
+                       AnalysisRequestTracker storeTracker = 
store.query(tracker.getRequest().getId());
+                       Assert.assertEquals(tracker.getRequest().getId(), 
storeTracker.getRequest().getId());
+                       Assert.assertEquals(STATUS.IN_DISCOVERY_SERVICE_QUEUE, 
storeTracker.getStatus());
+
+                       //start odf and cleanup here...
+                       logger.info("shutdown all threads and restart ODF");
+                       tm.shutdownAllUnmanagedThreads();
+
+                       int threadKillRetry = 0;
+                       while (tm.getNumberOfRunningThreads() > 0 && 
threadKillRetry < 20) {
+                               Thread.sleep(500);
+                               threadKillRetry++;
+                       }
+
+                       logger.info("All threads down, restart ODF " + 
threadKillRetry);
+
+                       // Initialize analysis manager
+                       new ODFFactory().create().getAnalysisManager();
+
+                       kafkaMonitor = new 
ODFInternalFactory().create(KafkaMonitor.class);
+                       origQueueConsumers = 
kafkaMonitor.getConsumerGroups(zookeeperConnectString, 
KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
+                       int healthRetrieveRetry = 0;
+                       //wait for max of 40 secs for status consumer to come 
up. If it is, we can continue because ODF is restarted successfully
+                       while (origQueueConsumers.isEmpty() && 
healthRetrieveRetry < 240) {
+                               healthRetrieveRetry++;
+                               Thread.sleep(500);
+                               origQueueConsumers = 
kafkaMonitor.getConsumerGroups(zookeeperConnectString, 
KafkaQueueManager.TOPIC_NAME_STATUS_QUEUE);
+                       }
+                       Assert.assertNotEquals(healthRetrieveRetry, 240);
+
+                       logger.info("initialized, wait for cleanup ... " + 
healthRetrieveRetry);
+                       Thread.sleep(5000);
+                       logger.info("Found health consumers: " + 
origQueueConsumers.toString());
+                       logger.info("hopefully cleaned up ...");
+                       AnalysisRequestTracker storedTracker = 
store.query(tracker.getRequest().getId());
+                       Assert.assertEquals(STATUS.ERROR, 
storedTracker.getStatus());
+                       logger.info("DONE CLEANING UP, ALL FINE");
+               }
+
+               Assert.assertEquals(0, multiThreadErrors.size());
+       }
+
+       public Properties getKafkaConsumerConfigProperties(String 
consumerGroupID) {
+               SettingsManager odfConfig = new 
ODFFactory().create().getSettingsManager();
+               Properties kafkaConsumerProps = 
odfConfig.getKafkaConsumerProperties();
+               kafkaConsumerProps.put("group.id", consumerGroupID);
+               if (zookeeperConnectString != null) {
+                       kafkaConsumerProps.put("zookeeper.connect", 
zookeeperConnectString);
+               }
+
+               
kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+
+               
kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+               
kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+               StringBuilder bld = new StringBuilder();
+               final Iterator<String> iterator = new 
ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperConnectString).iterator();
+               while (iterator.hasNext()) {
+                       bld.append(iterator.next());
+                       if (iterator.hasNext()) {
+                               bld.append(",");
+                       }
+               }
+               kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bld.toString());
+               
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+               return kafkaConsumerProps;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java
 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java
new file mode 100755
index 0000000..35b09e2
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MessageSearchConsumerTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.atlas.odf.core.messaging.kafka.MessageSearchConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class MessageSearchConsumerTest extends ODFTestBase {
+       private static final String TEST_SEARCH_STRING = "TEST_STRING_" + 
UUID.randomUUID().toString();
+       private static final String TEST_SEARCH_FAILURE_STRING = 
"TEST_FAILURE_STRING";
+       static Logger logger = ODFTestLogger.get();
+       final static String topicName = "MessageSearchConsumerTest" + 
UUID.randomUUID().toString();
+       private static final int PERFORMANCE_MSG_COUNT = 1000;
+       static String zookeeperHost = new 
ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+       private KafkaProducer<String, String> producer;
+
+       @BeforeClass
+       public static void createTopc() {
+               ZkClient zkClient = new ZkClient(zookeeperHost, 5000, 5000, 
ZKStringSerializer$.MODULE$);
+               ZkUtils utils = new ZkUtils(zkClient, new 
ZkConnection(zookeeperHost), false);
+               if (!AdminUtils.topicExists(utils, topicName)) {
+                       AdminUtils.createTopic(utils, topicName, 2, 1, new 
Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE);
+               }
+       }
+
+       @Test
+       public void testMsgSearchPerformance() throws InterruptedException, 
ExecutionException, TimeoutException {
+               logger.info("Producing msgs");
+               for (int no = 0; no < PERFORMANCE_MSG_COUNT; no++) {
+                       sendMsg("DUMMY_MSG" + no);
+               }
+               sendMsg(TEST_SEARCH_STRING);
+               logger.info("Done producing ...");
+               Thread.sleep(200);
+
+               final ThreadManager threadManager = new 
ODFInternalFactory().create(ThreadManager.class);
+               final CountDownLatch searchLatch = new CountDownLatch(1);
+               threadManager.startUnmanagedThread(UUID.randomUUID().toString() 
+ "_searchThread", new MessageSearchConsumer(topicName, new 
MessageSearchConsumer.SearchCompletedCallback() {
+
+                       @Override
+                       public void onDoneSearching(Map<String, 
PartitionOffsetInfo> msgPositionMap) {
+                               logger.info("Done searching " + 
msgPositionMap.get(TEST_SEARCH_STRING).getOffset());
+                               
Assert.assertTrue(msgPositionMap.get(TEST_SEARCH_STRING).getOffset() > -1);
+                               searchLatch.countDown();
+                       }
+               }, Arrays.asList(TEST_SEARCH_STRING)));
+
+               boolean await = searchLatch.await(5, TimeUnit.SECONDS);
+               if (await) {
+                       logger.info("Messages searched in time");
+               } else {
+                       logger.warning("Couldnt finish search in time");
+               }
+
+               final CountDownLatch failureSearchLatch = new CountDownLatch(1);
+               threadManager.startUnmanagedThread(UUID.randomUUID().toString() 
+ "_searchThread", new MessageSearchConsumer(topicName, new 
MessageSearchConsumer.SearchCompletedCallback() {
+
+                       @Override
+                       public void onDoneSearching(Map<String, 
PartitionOffsetInfo> msgPositionMap) {
+                               logger.info("Done searching " + 
msgPositionMap.toString());
+                               
Assert.assertFalse(msgPositionMap.containsKey(TEST_SEARCH_FAILURE_STRING));
+                               failureSearchLatch.countDown();
+                       }
+               }, Arrays.asList(TEST_SEARCH_FAILURE_STRING)));
+
+               await = searchLatch.await(5, TimeUnit.SECONDS);
+               if (await) {
+                       logger.info("Messages searched in time");
+               } else {
+                       logger.warning("Couldnt finish search in time");
+               }
+       }
+
+       @Test
+       public void testMsgSearchSuccessAndFailure() throws 
InterruptedException, ExecutionException, TimeoutException {
+               sendMsg(TEST_SEARCH_STRING);
+
+               Thread.sleep(200);
+
+               final ThreadManager threadManager = new 
ODFInternalFactory().create(ThreadManager.class);
+               final CountDownLatch searchLatch = new CountDownLatch(1);
+               threadManager.startUnmanagedThread(UUID.randomUUID().toString() 
+ "_searchThread", new MessageSearchConsumer(topicName, new 
MessageSearchConsumer.SearchCompletedCallback() {
+
+                       @Override
+                       public void onDoneSearching(Map<String, 
PartitionOffsetInfo> msgPositionMap) {
+                               logger.info("Done searching " + 
msgPositionMap.get(TEST_SEARCH_STRING).getOffset());
+                               
Assert.assertTrue(msgPositionMap.get(TEST_SEARCH_STRING).getOffset() > -1);
+                               searchLatch.countDown();
+                       }
+               }, Arrays.asList(TEST_SEARCH_STRING)));
+
+               boolean await = searchLatch.await(5, TimeUnit.SECONDS);
+               if (await) {
+                       logger.info("Messages searched in time");
+               } else {
+                       logger.warning("Couldnt finish search in time");
+               }
+
+               final CountDownLatch failureSearchLatch = new CountDownLatch(1);
+               threadManager.startUnmanagedThread(UUID.randomUUID().toString() 
+ "_searchThread", new MessageSearchConsumer(topicName, new 
MessageSearchConsumer.SearchCompletedCallback() {
+
+                       @Override
+                       public void onDoneSearching(Map<String, 
PartitionOffsetInfo> msgPositionMap) {
+                               logger.info("Done searching " + msgPositionMap);
+                               
Assert.assertFalse(msgPositionMap.containsKey(TEST_SEARCH_FAILURE_STRING));
+                               failureSearchLatch.countDown();
+                       }
+               }, Arrays.asList(TEST_SEARCH_FAILURE_STRING)));
+
+               await = searchLatch.await(5, TimeUnit.SECONDS);
+               if (await) {
+                       logger.info("Messages searched in time");
+               } else {
+                       logger.warning("Couldnt finish search in time");
+               }
+       }
+
+       void sendMsg(String msg) throws InterruptedException, 
ExecutionException, TimeoutException {
+               final KafkaProducer<String, String> producer = getProducer();
+               ProducerRecord<String, String> producerRecord = new 
ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg);
+               producer.send(producerRecord).get(15000, TimeUnit.MILLISECONDS);
+       }
+
+       private KafkaProducer<String, String> getProducer() {
+               if (this.producer == null) {
+                       SettingsManager odfConfig = new 
ODFFactory().create().getSettingsManager();
+                       Properties props = 
odfConfig.getKafkaProducerProperties();
+                       final Iterator<String> brokers = new 
ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+                       StringBuilder brokersString = new StringBuilder();
+                       while (brokers.hasNext()) {
+                               brokersString.append(brokers.next());
+                               if (brokers.hasNext()) {
+                                       brokersString.append(",");
+                               }
+                       }
+                       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokersString.toString());
+                       producer = new KafkaProducer<String, String>(props);
+               }
+               return producer;
+       }
+
+       @After
+       public void closeProducer() {
+               if (getProducer() != null) {
+                       getProducer().close();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java
 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java
new file mode 100755
index 0000000..f97dd4e
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/MultiPartitionConsumerTest.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInitializer;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaMonitor;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueConsumer;
+import org.apache.atlas.odf.core.messaging.kafka.KafkaQueueManager;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import 
org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+import kafka.admin.AdminUtils;
+import kafka.common.TopicExistsException;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class MultiPartitionConsumerTest extends ODFTestcase {
+       static Logger logger = ODFTestLogger.get();
+       final static String topicName = "my_dummy_test_topic" + 
UUID.randomUUID().toString();
+       static String zookeeperHost = new 
ODFInternalFactory().create(Environment.class).getZookeeperConnectString();
+       static final int PARTITION_COUNT = 3;
+       private static final int MSG_PER_PARTITION = 5;
+       private final ThreadManager threadManager = new 
ODFInternalFactory().create(ThreadManager.class);
+
+       @BeforeClass
+       public static void setupTopic() {
+               ZkClient zkClient = null;
+               try {
+                       zkClient = new ZkClient(zookeeperHost, 5000, 5000, 
ZKStringSerializer$.MODULE$);
+                       logger.log(Level.FINEST, "Checking if topic ''{0}'' 
already exists", topicName);
+                       // using partition size 1 and replication size 1, no 
special
+                       // per-topic config needed
+                       logger.log(Level.FINE, "Topic ''{0}'' does not exist, 
creating it", topicName);
+                       //FIXME zkUtils isSecure parameter? Only with SSL! --> 
parse zkhosts?
+                       AdminUtils.createTopic(new ZkUtils(zkClient, new 
ZkConnection(zookeeperHost), false), topicName, PARTITION_COUNT, 1, new 
Properties(), KafkaQueueManager.DEFAULT_RACK_AWARE_MODE);
+                       logger.log(Level.FINE, "Topic ''{0}'' created", 
topicName);
+               } catch (TopicExistsException ex) {
+                       logger.log(Level.FINE, "Topic ''{0}'' already exists.", 
topicName);
+               } catch (ZkTimeoutException zkte) {
+                       logger.log(Level.SEVERE, "Could not connect to the 
Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", 
zookeeperHost);
+               } finally {
+                       if (zkClient != null) {
+                               zkClient.close();
+                       }
+               }
+       }
+
+       @After
+       public void cleanupConsumers() {
+               logger.info("Cleaning up consumers...");
+               logger.info("----------------------------------  Stopping 
ODF...");
+               ODFInitializer.stop();
+               logger.info("----------------------------------  Starting 
ODF...");
+               ODFInitializer.start();
+               logger.info("----------------------------------  ODF started.");
+       }
+
+       @Test
+       public void testMultiPartitionDelayedConsumption() throws 
InterruptedException, ExecutionException {
+               Properties kafkaConsumerProperties = getConsumerProps();
+               final List<String> consumedMsgs = new ArrayList<String>();
+               List<ThreadStartupResult> startupList = new 
ArrayList<ThreadStartupResult>();
+
+               final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_";
+               final int processingDelay = 2000;
+               for (int no = 0; no < PARTITION_COUNT; no++) {
+                       final int currentThread = no;
+                       final QueueMessageProcessor requestConsumer = new 
QueueMessageProcessor() {
+
+                               @Override
+                               public void process(ExecutorService 
executorService, String msg, int partition, long msgOffset) {
+                                       try {
+                                               Thread.sleep(processingDelay);
+                                       } catch (InterruptedException e) {
+                                               // TODO Auto-generated catch 
block
+                                               e.printStackTrace();
+                                       }
+                                       consumedMsgs.add(msg);
+                                       logger.info("process " + msg + " in 
thread " + currentThread);
+                               }
+                       };
+
+                       KafkaQueueConsumer cnsmr = new 
KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer);
+
+                       final String consumerThread = threadPrefix + no;
+                       final ThreadStartupResult startUnmanagedThread = 
threadManager.startUnmanagedThread(consumerThread, cnsmr);
+                       startupList.add(startUnmanagedThread);
+               }
+               try {
+                       threadManager.waitForThreadsToBeReady(30000, 
startupList);
+                       for (int no = 0; no < PARTITION_COUNT; no++) {
+                               for (int msgNo = 0; msgNo < MSG_PER_PARTITION; 
msgNo++) {
+                                       sendMsg("Partition " + no + " msg " + 
msgNo);
+                               }
+                       }
+
+                       int totalWait = 0;
+                       while (totalWait < PARTITION_COUNT * MSG_PER_PARTITION 
* processingDelay + 10000 && consumedMsgs.size() < PARTITION_COUNT * 
MSG_PER_PARTITION) {
+                               Thread.sleep(2000);
+                               totalWait += 2000;
+                       }
+
+                       logger.info("Done with all messages after " + 
totalWait);
+
+                       Assert.assertEquals(PARTITION_COUNT * 
MSG_PER_PARTITION, consumedMsgs.size());
+
+                       for (int no = 0; no < PARTITION_COUNT; no++) {
+                               final ThreadStatus.ThreadState 
stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + 
no);
+                               
Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread);
+                       }
+               } catch (TimeoutException e) {
+                       Assert.fail("Consumer could not be started on time");
+               }
+       }
+
+       @Test
+       public void testMultiPartitionConsumption() throws 
InterruptedException, ExecutionException {
+               Properties kafkaConsumerProperties = getConsumerProps();
+               final List<String> consumedMsgs = new ArrayList<String>();
+               List<ThreadStartupResult> startupList = new 
ArrayList<ThreadStartupResult>();
+
+               final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_";
+               for (int no = 0; no < PARTITION_COUNT; no++) {
+                       final int currentThread = no;
+                       final QueueMessageProcessor requestConsumer = new 
QueueMessageProcessor() {
+
+                               @Override
+                               public void process(ExecutorService 
executorService, String msg, int partition, long msgOffset) {
+                                       consumedMsgs.add(msg);
+                                       logger.info("process " + msg + " in 
thread " + currentThread);
+                               }
+                       };
+
+                       KafkaQueueConsumer cnsmr = new 
KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer);
+
+                       final String consumerThread = threadPrefix + no;
+                       final ThreadStartupResult startUnmanagedThread = 
threadManager.startUnmanagedThread(consumerThread, cnsmr);
+                       startupList.add(startUnmanagedThread);
+               }
+               try {
+                       threadManager.waitForThreadsToBeReady(30000, 
startupList);
+                       for (int no = 0; no < PARTITION_COUNT; no++) {
+                               for (int msgNo = 0; msgNo < MSG_PER_PARTITION; 
msgNo++) {
+                                       sendMsg("Partition " + no + " msg " + 
msgNo);
+                               }
+                       }
+
+                       int totalWait = 0;
+                       boolean done = false;
+                       while (totalWait < 30 && !done) {
+                               if (consumedMsgs.size() == PARTITION_COUNT * 
MSG_PER_PARTITION) {
+                                       done = true;
+                               }
+                               totalWait++;
+                               Thread.sleep(500);
+                       }
+
+                       Assert.assertEquals(PARTITION_COUNT * 
MSG_PER_PARTITION, consumedMsgs.size());
+
+                       for (int no = 0; no < PARTITION_COUNT; no++) {
+                               final ThreadStatus.ThreadState 
stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + 
no);
+                               
Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread);
+                       }
+               } catch (TimeoutException e) {
+                       Assert.fail("Consumer could not be started on time");
+               }
+       }
+
+       @Test
+       public void testMultiPartitionExceptionAndRetryDuringProcessing() 
throws InterruptedException, ExecutionException {
+               Properties kafkaConsumerProperties = getConsumerProps();
+               final List<String> consumedMsgs = new ArrayList<String>();
+               List<ThreadStartupResult> startupList = new 
ArrayList<ThreadStartupResult>();
+
+               final String threadPrefix = "TEST_CONSUMER_RETRY_RUNNING_";
+               for (int no = 0; no < PARTITION_COUNT; no++) {
+                       final int currentThread = no;
+                       final QueueMessageProcessor requestConsumer = new 
QueueMessageProcessor() {
+
+                               private int excCount = 0;
+
+                               @Override
+                               public void process(ExecutorService 
executorService, String msg, int partition, long msgOffset) {
+                                       if (excCount < 
KafkaQueueConsumer.MAX_PROCESSING_EXCEPTIONS - 1) {
+                                               excCount++;
+                                               logger.info("Throw exception " 
+ excCount + " on consumer " + currentThread);
+                                               throw new 
RuntimeException("Forced error on consumer");
+                                       }
+                                       consumedMsgs.add(msg);
+                                       logger.info("process " + msg + " in 
thread " + currentThread);
+                               }
+                       };
+
+                       KafkaQueueConsumer cnsmr = new 
KafkaQueueConsumer(topicName, kafkaConsumerProperties, requestConsumer);
+
+                       final String consumerThread = threadPrefix + no;
+                       final ThreadStartupResult startUnmanagedThread = 
threadManager.startUnmanagedThread(consumerThread, cnsmr);
+                       startupList.add(startUnmanagedThread);
+               }
+               try {
+                       threadManager.waitForThreadsToBeReady(30000, 
startupList);
+                       for (int no = 0; no < PARTITION_COUNT; no++) {
+                               for (int msgNo = 0; msgNo < MSG_PER_PARTITION; 
msgNo++) {
+                                       sendMsg("Partition " + no + " msg " + 
msgNo);
+                               }
+                       }
+
+                       int totalWait = 0;
+                       boolean done = false;
+                       while (totalWait < 30 && !done) {
+                               if (consumedMsgs.size() == PARTITION_COUNT * 
MSG_PER_PARTITION) {
+                                       done = true;
+                               }
+                               totalWait++;
+                               Thread.sleep(500);
+                       }
+                       Assert.assertEquals(PARTITION_COUNT * 
MSG_PER_PARTITION, consumedMsgs.size());
+
+                       for (int no = 0; no < PARTITION_COUNT; no++) {
+                               final ThreadStatus.ThreadState 
stateOfUnmanagedThread = threadManager.getStateOfUnmanagedThread(threadPrefix + 
no);
+                               
Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, stateOfUnmanagedThread);
+                       }
+               } catch (TimeoutException e) {
+                       Assert.fail("Consumer could not be started on time");
+               }
+       }
+
+       private Properties getConsumerProps() {
+               SettingsManager odfConfig = new 
ODFFactory().create().getSettingsManager();
+               Properties kafkaConsumerProperties = 
odfConfig.getKafkaConsumerProperties();
+               final String groupId = "retrying-dummy-consumer";
+               kafkaConsumerProperties.put("group.id", groupId);
+               kafkaConsumerProperties.put("zookeeper.connect", zookeeperHost);
+               final Iterator<String> brokers = new 
ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+               StringBuilder brokersString = new StringBuilder();
+               while (brokers.hasNext()) {
+                       brokersString.append(brokers.next());
+                       if (brokers.hasNext()) {
+                               brokersString.append(",");
+                       }
+               }
+               
kafkaConsumerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokersString.toString());
+               
kafkaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+               
kafkaConsumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.StringDeserializer");
+               
kafkaConsumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.StringDeserializer");
+
+               return kafkaConsumerProperties;
+       }
+
+       void sendMsg(String msg) throws InterruptedException, 
ExecutionException, TimeoutException {
+               SettingsManager odfConfig = new 
ODFFactory().create().getSettingsManager();
+               Properties props = odfConfig.getKafkaProducerProperties();
+               final Iterator<String> brokers = new 
ODFInternalFactory().create(KafkaMonitor.class).getBrokers(zookeeperHost).iterator();
+               StringBuilder brokersString = new StringBuilder();
+               while (brokers.hasNext()) {
+                       brokersString.append(brokers.next());
+                       if (brokers.hasNext()) {
+                               brokersString.append(",");
+                       }
+               }
+               props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokersString.toString());
+               //Should we use a custom partitioner? we could try to involve 
consumer offsets and always put on "emptiest" partition
+               //props.put("partitioner.class", TestMessagePartitioner.class);
+
+               final KafkaProducer<String, String> producer = new 
KafkaProducer<String, String>(props);
+               ProducerRecord<String, String> producerRecord = new 
ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), msg);
+               producer.send(producerRecord).get(3000, TimeUnit.MILLISECONDS);
+               producer.close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java
 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java
new file mode 100755
index 0000000..d1c9810
--- /dev/null
+++ 
b/odf/odf-messaging/src/test/java/org/apache/atlas/odf/core/test/messaging/kafka/ParallelServiceErrorTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.messaging.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisManager;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest;
+
+public class ParallelServiceErrorTest extends ODFTestcase {
+       private static final int NUMBER_OF_QUEUED_REQUESTS = 1;
+       Logger log = ODFTestLogger.get();
+
+       @Test
+       public void runDataSetsInParallelError() throws Exception {
+               runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] 
{ "successID1", "errorID2" }), AnalysisRequestStatus.State.FINISHED, 
AnalysisRequestStatus.State.ERROR);
+       }
+
+       private void runDataSetsInParallelAndCheckResult(List<String> 
dataSetIDs, AnalysisRequestStatus.State... expectedState) throws Exception {
+               log.info("Running data sets in parallel: " + dataSetIDs);
+               log.info("Expected state: " + expectedState);
+               AnalysisManager analysisManager = new 
ODFFactory().create().getAnalysisManager();
+
+               List<AnalysisRequest> requests = new 
ArrayList<AnalysisRequest>();
+               List<AnalysisResponse> responses = new 
ArrayList<AnalysisResponse>();
+               List<String> idList = new ArrayList<String>();
+
+               for (int no = 0; no < NUMBER_OF_QUEUED_REQUESTS; no++) {
+                       for (String dataSet : dataSetIDs) {
+                               final AnalysisRequest req = 
ODFAPITest.createAnalysisRequest(Arrays.asList(dataSet + 
UUID.randomUUID().toString()));
+                               AnalysisResponse resp = 
analysisManager.runAnalysis(req);
+                               req.setId(resp.getId());
+                               requests.add(req);
+                               idList.add(resp.getId());
+                               responses.add(resp);
+                       }
+               }
+               log.info("Parallel requests started: " + idList.toString());
+
+               Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * 
dataSetIDs.size(), requests.size());
+               Assert.assertEquals(NUMBER_OF_QUEUED_REQUESTS * 
dataSetIDs.size(), responses.size());
+
+               // check that requests are processed in parallel: 
+               //   there must be a point in time where both requests are in 
status "active"
+               log.info("Polling for status of parallel request...");
+               boolean foundPointInTimeWhereBothRequestsAreActive = false;
+               int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS;
+               List<AnalysisRequestStatus.State> allSingleStates = new 
ArrayList<AnalysisRequestStatus.State>();
+               do {
+                       int foundActive = 0;
+                       allSingleStates.clear();
+                       for (AnalysisRequest request : requests) {
+                               final AnalysisRequestStatus.State state = 
analysisManager.getAnalysisRequestStatus(request.getId()).getState();
+                               if (state == 
AnalysisRequestStatus.State.ACTIVE) {
+                                       log.info("ACTIVE: " + request.getId() + 
" foundactive: " + foundActive);
+                                       foundActive++;
+                               } else {
+                                       log.info("NOT ACTIVE " + 
request.getId() + " _ " + state);
+                               }
+                               allSingleStates.add(state);
+                       }
+                       if (foundActive > 1) {
+                               foundPointInTimeWhereBothRequestsAreActive = 
true;
+                       }
+
+                       maxPolls--;
+                       Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING);
+               } while (maxPolls > 0 && Utils.containsNone(allSingleStates, 
new AnalysisRequestStatus.State[] { AnalysisRequestStatus.State.ACTIVE, 
AnalysisRequestStatus.State.QUEUED }));
+
+               Assert.assertTrue(maxPolls > 0);
+               Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive);
+               
Assert.assertTrue(allSingleStates.containsAll(Arrays.asList(expectedState)));
+       }
+}

Reply via email to