abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/657

Change subject: Introduce NC to NC Messaging
......................................................................

Introduce NC to NC Messaging

This change introduces NC to NC messaging. NC message
broker takes care of listening to NC message port and
sending messages to other NCs.

Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
M 
asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M 
asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
A 
asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
A asterix-app/src/main/resources/cluster.xml
M 
asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M 
asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M 
asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M 
asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M 
asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
M 
asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M 
asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-external-data/pom.xml
A 
asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
A 
asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
A 
asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M 
asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterix-installer/src/main/resources/clusters/local/local.xml
M 
asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
M 
asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M 
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
33 files changed, 617 insertions(+), 59 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/57/657/1

diff --git 
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
 
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index e33aed2..e3493c6 100644
--- 
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ 
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -87,7 +87,7 @@
             if (tempPath.endsWith(File.separator)) {
                 tempPath = tempPath.substring(0, tempPath.length() - 1);
             }
-            //get initial partitions from properties
+            // get initial partitions from properties
             String[] nodeStores = propertiesAccessor.getStores().get(ncName);
             if (nodeStores == null) {
                 throw new Exception("Coudn't find stores for NC: " + ncName);
@@ -97,7 +97,7 @@
                 tempDirPath += File.separator;
             }
             for (int p = 0; p < nodeStores.length; p++) {
-                //create IO devices based on stores
+                // create IO devices based on stores
                 String iodevicePath = tempDirPath + ncConfig1.nodeId + 
File.separator + nodeStores[p];
                 File ioDeviceDir = new File(iodevicePath);
                 ioDeviceDir.mkdirs();
diff --git 
a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
 
b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4922ae6..58523f4 100644
--- 
a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ 
b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -122,7 +122,7 @@
 
             if (replicationEnabled) {
                 if (systemState == SystemState.NEW_UNIVERSE || systemState == 
SystemState.CORRUPTED) {
-                    //Try to perform remote recovery
+                    // Try to perform remote recovery
                     IRemoteRecoveryManager remoteRecoveryMgr = 
runtimeContext.getRemoteRecoveryManager();
                     if (autoFailover) {
                         remoteRecoveryMgr.startFailbackProcess();
@@ -153,13 +153,13 @@
     }
 
     private void startReplicationService() {
-        //Open replication channel
+        // Open replication channel
         runtimeContext.getReplicationChannel().start();
 
-        //Check the state of remote replicas
+        // Check the state of remote replicas
         runtimeContext.getReplicationManager().initializeReplicasState();
 
-        //Start replication after the state of remote replicas has been 
initialized.
+        // Start replication after the state of remote replicas has been 
initialized.
         runtimeContext.getReplicationManager().startReplicationThreads();
     }
 
@@ -176,10 +176,10 @@
                 MetadataBootstrap.stopUniverse();
             }
 
-            //Clean any temporary files
+            // Clean any temporary files
             performLocalCleanUp();
 
-            //Note: stopping recovery manager will make a sharp checkpoint
+            // Note: stopping recovery manager will make a sharp checkpoint
             ncApplicationContext.getLifeCycleComponentManager().stopAll(false);
             runtimeContext.deinitialize();
         } else {
@@ -191,7 +191,7 @@
 
     @Override
     public void notifyStartupComplete() throws Exception {
-        //Send max resource id on this NC to the CC
+        // Send max resource id on this NC to the CC
         ((INCMessageBroker) 
ncApplicationContext.getMessageBroker()).reportMaxResourceId();
 
         AsterixMetadataProperties metadataProperties = 
((IAsterixPropertiesProvider) runtimeContext)
@@ -245,15 +245,15 @@
             }
         }
 
-        //Clean any temporary files
+        // Clean any temporary files
         performLocalCleanUp();
     }
 
     private void performLocalCleanUp() {
-        //Delete working area files from failed jobs
+        // Delete working area files from failed jobs
         runtimeContext.getIOManager().deleteWorkspaceFiles();
 
-        //Reclaim storage for temporary datasets.
+        // Reclaim storage for temporary datasets.
         String storageDirName = 
AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
         String[] ioDevices = ((PersistentLocalResourceRepository) 
runtimeContext.getLocalResourceRepository())
                 .getStorageMountingPoints();
@@ -263,9 +263,9 @@
             FileUtils.deleteQuietly(new File(tempDatasetsDir));
         }
 
-        //TODO
-        //Reclaim storage for orphaned index artifacts in NCs.
-        //Note: currently LSM indexes invalid components are deleted when an 
index is activated.
+        // TODO
+        // Reclaim storage for orphaned index artifacts in NCs.
+        // Note: currently LSM indexes invalid components are deleted when an 
index is activated.
     }
 
     private void updateOnNodeJoin() {
diff --git 
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 0a0a917..de3848e 100644
--- 
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -24,7 +24,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.api.common.AsterixAppRuntimeContext;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
 import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
@@ -43,6 +43,7 @@
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -54,16 +55,21 @@
     private final NodeControllerService ncs;
     private final AtomicLong messageId = new AtomicLong(0);
     private final Map<Long, IApplicationMessageCallback> callbacks;
-    private final IAsterixAppRuntimeContext appContext;
+    private final NCMessagingServer messageServer;
+
+    private final AsterixAppRuntimeContext appContext;
 
     public NCMessageBroker(NodeControllerService ncs) {
         this.ncs = ncs;
-        appContext = (IAsterixAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
         callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
+        // Start listening to NC messages
+        appContext = (AsterixAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
+        messageServer = new 
NCMessagingServer(appContext.getMetadataProperties().getNodeMessagingPort(ncs.getId()));
+        messageServer.start();
     }
 
     @Override
-    public void sendMessage(IApplicationMessage message, 
IApplicationMessageCallback callback) throws Exception {
+    public void sendMessageToCC(IApplicationMessage message, 
IApplicationMessageCallback callback) throws Exception {
         if (callback != null) {
             long uniqueMessageId = messageId.incrementAndGet();
             message.setId(uniqueMessageId);
@@ -73,7 +79,7 @@
             
ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
         } catch (Exception e) {
             if (callback != null) {
-                //remove the callback in case of failure
+                // remove the callback in case of failure
                 callbacks.remove(message.getId());
             }
             throw e;
@@ -131,7 +137,7 @@
             //send response after takeover is completed
             TakeoverPartitionsResponseMessage reponse = new 
TakeoverPartitionsResponseMessage(msg.getRequestId(),
                     appContext.getTransactionSubsystem().getId(), 
msg.getPartitions());
-            sendMessage(reponse, null);
+            sendMessageToCC(reponse, null);
         }
     }
 
@@ -142,18 +148,18 @@
         } finally {
             TakeoverMetadataNodeResponseMessage reponse = new 
TakeoverMetadataNodeResponseMessage(
                     appContext.getTransactionSubsystem().getId());
-            sendMessage(reponse, null);
+            sendMessageToCC(reponse, null);
         }
     }
 
     @Override
     public void reportMaxResourceId() throws Exception {
         ReportMaxResourceIdMessage maxResourceIdMsg = new 
ReportMaxResourceIdMessage();
-        //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for 
metadata indexes.
+        // resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for 
metadata indexes.
         long maxResourceId = 
Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
                 
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
         maxResourceIdMsg.setMaxResourceId(maxResourceId);
-        sendMessage(maxResourceIdMsg, null);
+        sendMessageToCC(maxResourceIdMsg, null);
     }
 
     private void handleReplicaEvent(IMessage message) {
@@ -195,7 +201,7 @@
         //send response after partitions prepared for failback
         PreparePartitionsFailbackResponseMessage reponse = new 
PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
                 msg.getRequestId(), msg.getPartitions());
-        sendMessage(reponse, null);
+        sendMessageToCC(reponse, null);
     }
 
     private void handleCompleteFailbackRequest(IMessage message) throws 
Exception {
@@ -206,7 +212,15 @@
         } finally {
             CompleteFailbackResponseMessage reponse = new 
CompleteFailbackResponseMessage(msg.getPlanId(),
                     msg.getRequestId(), msg.getPartitions());
-            sendMessage(reponse, null);
+            sendMessageToCC(reponse, null);
         }
     }
+
+    @Override
+    public void sendMessageToNC(String to, IApplicationMessage message, 
IApplicationMessageCallback callback)
+            throws Exception {
+        NCMessagingClient client = new 
NCMessagingClient(AsterixClusterProperties.INSTANCE.getNodeIpAddress(to),
+                
appContext.getMetadataProperties().getNodeMessagingPort(ncs.getId()), 4);
+        client.start();
+    }
 }
diff --git 
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
new file mode 100644
index 0000000..e34a0f3
--- /dev/null
+++ 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+
+public class NCMessageDecoder extends ReplayingDecoder<Void> {
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> 
out) throws Exception {
+        System.err.println("NCMessageDecoder: decode called");
+        out.add(in.readBytes(4));
+    }
+}
diff --git 
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
new file mode 100644
index 0000000..63eb888
--- /dev/null
+++ 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import javax.net.ssl.SSLException;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public class NCMessagingClient {
+    private final String host;
+    private final int port;
+    private final int size;
+
+    public NCMessagingClient(String host, int port, int size) {
+        this.host = host;
+        this.port = port;
+        this.size = size;
+    }
+
+    public void start() throws SSLException, InterruptedException {
+        EventLoopGroup group = new NioEventLoopGroup();
+        try {
+            Bootstrap b = new Bootstrap();
+            b.group(group).channel(NioSocketChannel.class).handler(new 
ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel ch) throws Exception {
+                    ChannelPipeline p = ch.pipeline();
+                    p.addLast(new NCMessagingClientHandler(size));
+                }
+            });
+
+            // Make the connection attempt.
+            ChannelFuture f = b.connect(host, port).sync();
+
+            // Wait until the connection is closed.
+            f.channel().closeFuture().sync();
+        } finally {
+            group.shutdownGracefully();
+        }
+    }
+}
diff --git 
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
new file mode 100644
index 0000000..40dd153
--- /dev/null
+++ 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+public class NCMessagingClientHandler extends 
SimpleChannelInboundHandler<Object> {
+
+    private ByteBuf content;
+    private ChannelHandlerContext ctx;
+    private int size;
+
+    public NCMessagingClientHandler(int size) {
+        this.size = size;
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+        this.ctx = ctx;
+
+        // Initialize the message.
+        content = ctx.alloc().directBuffer(size);
+        content.writeInt(10);
+
+        // Send the initial messages.
+        generateTraffic();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        content.release();
+    }
+
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        // Server is supposed to send nothing, but if it sends something, 
discard it.
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        // Close the connection when an exception is raised.
+        cause.printStackTrace();
+        ctx.close();
+    }
+
+    long counter;
+
+    private void generateTraffic() {
+        // Flush the outbound buffer to the socket.
+        // Once flushed, generate the same amount of traffic again.
+        
ctx.writeAndFlush(content.duplicate().retain()).addListener(trafficGenerator);
+    }
+
+    private final ChannelFutureListener trafficGenerator = new 
ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture future) {
+            if (future.isSuccess()) {
+                future.channel().close();
+            } else {
+                future.cause().printStackTrace();
+                future.channel().close();
+            }
+        }
+    };
+
+}
diff --git 
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
new file mode 100644
index 0000000..3cb6aaa
--- /dev/null
+++ 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class NCMessagingHandler extends ChannelInboundHandlerAdapter {
+    private ByteBuf buf;
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) {
+        buf = ctx.alloc().buffer(4);
+    }
+
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) {
+        buf.release();
+        buf = null;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        System.err.println("channelRead Called");
+        ByteBuf m = (ByteBuf) msg;
+        buf.writeBytes(m);
+        m.release();
+
+        if (buf.readableBytes() >= 4) {
+            System.out.println(buf.readInt());
+            ctx.close();
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        cause.printStackTrace();
+        ctx.close();
+    }
+}
diff --git 
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
new file mode 100644
index 0000000..dbf4c0e
--- /dev/null
+++ 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+public class NCMessagingServer {
+    private int port;
+    private ChannelInboundHandlerAdapter handler = new NCMessagingHandler();
+    private ChannelFuture f;
+
+    public NCMessagingServer(int port) {
+        this.port = port;
+    }
+
+    public void stop() {
+        f.channel().close();
+    }
+
+    public void start() {
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                EventLoopGroup bossGroup = new NioEventLoopGroup();
+                EventLoopGroup workerGroup = new NioEventLoopGroup();
+                try {
+                    ServerBootstrap b = new ServerBootstrap();
+                    b.group(bossGroup, 
workerGroup).channel(NioServerSocketChannel.class)
+                            .childHandler(new 
ChannelInitializer<SocketChannel>() {
+                        @Override
+                        public void initChannel(SocketChannel ch) throws 
Exception {
+                            ch.pipeline().addLast(new NCMessageDecoder(), new 
NCMessagingHandler());
+                        }
+                    }).option(ChannelOption.SO_BACKLOG, 
128).childOption(ChannelOption.SO_KEEPALIVE, true);
+
+                    // Bind and start to accept incoming connections.
+                    ChannelFuture f = b.bind(port).sync();
+
+                    // Wait until the server socket is closed.
+                    // In this example, this does not happen, but you can do 
that to gracefully
+                    // shut down your server.
+                    f.channel().closeFuture().sync();
+                } catch (Throwable th) {
+                    // Do something
+                    th.printStackTrace();
+                } finally {
+                    workerGroup.shutdownGracefully();
+                    bossGroup.shutdownGracefully();
+                }
+            }
+        });
+        thread.setName("NCMessagingService:" + port);
+        thread.start();
+    }
+}
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml 
b/asterix-app/src/main/resources/asterix-build-configuration.xml
index 731113b..2dd5cfc 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -20,10 +20,12 @@
        <metadataNode>asterix_nc1</metadataNode>
        <store>
                <ncId>asterix_nc1</ncId>
+        <nc_messaging_port>4501</nc_messaging_port>
                <storeDirs>iodevice0,iodevice1</storeDirs>
        </store>
        <store>
                <ncId>asterix_nc2</ncId>
+        <nc_messaging_port>4502</nc_messaging_port>
                <storeDirs>iodevice0,iodevice1</storeDirs>
        </store>
        <transactionLogDir>
diff --git a/asterix-app/src/main/resources/cluster.xml 
b/asterix-app/src/main/resources/cluster.xml
new file mode 100644
index 0000000..7c7e321
--- /dev/null
+++ b/asterix-app/src/main/resources/cluster.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+    <instance_name>asterix</instance_name>
+    <store>storage</store>
+    <master_node>
+        <id>master</id>
+        <client_ip>127.0.0.1</client_ip>
+        <cluster_ip>127.0.0.1</cluster_ip>
+        <client_port>1098</client_port>
+        <cluster_port>1099</cluster_port>
+        <http_port>8888</http_port>
+    </master_node>
+    <nc_messaging_port>4503</nc_messaging_port>
+    <node>
+        <id>nc1</id>
+        <cluster_ip>127.0.0.1</cluster_ip>
+        <nc_messaging_port>4501</nc_messaging_port>
+    </node>
+    <node>
+        <id>nc2</id>
+        <cluster_ip>127.0.0.1</cluster_ip>
+        <nc_messaging_port>4502</nc_messaging_port>
+    </node>
+</cluster>
diff --git 
a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
 
b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 473a163..289a8ed 100644
--- 
a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ 
b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -69,4 +69,8 @@
     public Map<String, String> getTransactionLogDirs() {
         return accessor.getTransactionLogDirs();
     }
+
+    public int getNodeMessagingPort(String nodeId) {
+        return accessor.getNodeMessagingPort(nodeId);
+    }
 }
diff --git 
a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
 
b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 13ce403..3c31936 100644
--- 
a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ 
b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -57,7 +57,8 @@
     private final Map<String, String> transactionLogDirs;
     private final Map<String, String> asterixBuildProperties;
     private final Map<String, ClusterPartition[]> nodePartitionsMap;
-    private SortedMap<Integer, ClusterPartition> clusterPartitions;
+    private final SortedMap<Integer, ClusterPartition> clusterPartitions;
+    private final Map<String, Integer> node2NCMessagingPort;
 
     public AsterixPropertiesAccessor() throws AsterixException {
         String fileName = 
System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
@@ -86,7 +87,9 @@
         instanceName = asterixConfiguration.getInstanceName();
         metadataNodeName = asterixConfiguration.getMetadataNode();
         stores = new HashMap<String, String[]>();
+        node2NCMessagingPort = new HashMap<>();
         List<Store> configuredStores = asterixConfiguration.getStore();
+
         nodeNames = new HashSet<String>();
         nodePartitionsMap = new HashMap<>();
         clusterPartitions = new TreeMap<>();
@@ -103,6 +106,7 @@
             stores.put(store.getNcId(), nodeStores);
             nodePartitionsMap.put(store.getNcId(), nodePartitions);
             nodeNames.add(store.getNcId());
+            node2NCMessagingPort.put(store.getNcId(), 
Integer.parseInt(store.getNcMessagingPort()));
         }
         asterixConfigurationParams = new HashMap<String, Property>();
         for (Property p : asterixConfiguration.getProperty()) {
@@ -208,4 +212,8 @@
     public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
         return clusterPartitions;
     }
+
+    public int getNodeMessagingPort(String nodeId) {
+        return node2NCMessagingPort.get(nodeId);
+    }
 }
diff --git 
a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
 
b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 5d2e263..95fe37b 100644
--- 
a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ 
b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -42,7 +42,7 @@
 
     /**
      * Sets a unique message id that identifies this message within an NC.
-     * This id is set by {@link 
INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)}
+     * This id is set by {@link 
INCMessageBroker#sendMessageToCC(IApplicationMessage, 
IApplicationMessageCallback)}
      * when the callback is not null to notify the sender when the response to 
that message is received.
      *
      * @param messageId
diff --git 
a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
 
b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 41f8a0c..724ed04 100644
--- 
a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ 
b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,17 @@
      * @param callback
      * @throws Exception
      */
-    public void sendMessage(IApplicationMessage message, 
IApplicationMessageCallback callback) throws Exception;
+    public void sendMessageToCC(IApplicationMessage message, 
IApplicationMessageCallback callback) throws Exception;
+
+    /**
+     * Sends application message from this NC to another NC.
+     *
+     * @param message
+     * @param callback
+     * @throws Exception
+     */
+    public void sendMessageToNC(String to, IApplicationMessage message, 
IApplicationMessageCallback callback)
+            throws Exception;
 
     /**
      * Sends the maximum resource id on this NC to the CC.
diff --git a/asterix-common/src/main/resources/schema/asterix-conf.xsd 
b/asterix-common/src/main/resources/schema/asterix-conf.xsd
index bb99319..3a66421 100644
--- a/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -56,12 +56,16 @@
     <xs:element
         name="txnLogDirPath"
         type="xs:string" />
-
+    <xs:element
+        name="nc_messaging_port"
+        type="xs:string" />
+        
     <!-- definition of complex elements -->
     <xs:element name="store">
         <xs:complexType>
             <xs:sequence>
                 <xs:element ref="mg:ncId" />
+                <xs:element ref="mg:nc_messaging_port" />
                 <xs:element ref="mg:storeDirs" />
             </xs:sequence>
         </xs:complexType>
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd 
b/asterix-common/src/main/resources/schema/cluster.xsd
index 935d33f..1ec9a64 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -57,6 +57,7 @@
     <xs:element name="result_time_to_live" type="xs:long" />
     <xs:element name="result_sweep_threshold" type="xs:long" />
     <xs:element name="cc_root" type="xs:string" />
+    <xs:element name="nc_messaging_port" type="xs:integer" />
 
        <!-- definition of complex elements -->
        <xs:element name="working_dir">
@@ -123,6 +124,7 @@
                                <xs:element ref="cl:txn_log_dir" minOccurs="0" 
/>
                                <xs:element ref="cl:iodevices" minOccurs="0" />
                                <xs:element ref="cl:debug_port" minOccurs="0" />
+                <xs:element ref="cl:nc_messaging_port" minOccurs="0" />
                        </xs:sequence>
                </xs:complexType>
        </xs:element>
@@ -151,6 +153,7 @@
                                <xs:element ref="cl:metadata_node" />
                                <xs:element ref="cl:data_replication" 
minOccurs="0" />
                                <xs:element ref="cl:master_node" />
+                <xs:element ref="cl:nc_messaging_port" minOccurs="0" />
                                <xs:element ref="cl:node" maxOccurs="unbounded" 
/>
                                <xs:element ref="cl:substitute_nodes" />
                 <xs:element ref="cl:heartbeat_period" minOccurs="0" />
diff --git 
a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java 
b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index 29765fd..c92262c 100644
--- 
a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ 
b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
 public class EventDriver {
 
     public static final String CLIENT_NODE_ID = "client_node";
-    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, 
"127.0.0.1", null, null, null, null, null);
+    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, 
"127.0.0.1", null, null, null, null, null, null);
 
     private static String eventsDir;
     private static Events events;
diff --git 
a/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
 
b/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
index e0e5bc4..b8c7f7c 100644
--- 
a/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
+++ 
b/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
@@ -25,13 +25,12 @@
 import java.util.Timer;
 import java.util.TimerTask;
 
-import org.apache.log4j.Logger;
-
 import org.apache.asterix.event.driver.EventDriver;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.event.schema.event.Event;
 import org.apache.asterix.event.schema.pattern.Pattern;
 import org.apache.asterix.event.schema.pattern.Period;
+import org.apache.log4j.Logger;
 
 public class EventTask extends TimerTask {
 
@@ -68,8 +67,8 @@
             this.interval = EventUtil.parseTimeInterval(period.getAbsvalue(), 
period.getUnit());
         }
         if (pattern.getDelay() != null) {
-            this.initialDelay = EventUtil.parseTimeInterval(new 
ValueType(pattern.getDelay().getValue()), pattern
-                    .getDelay().getUnit());
+            this.initialDelay = EventUtil.parseTimeInterval(new 
ValueType(pattern.getDelay().getValue()),
+                    pattern.getDelay().getUnit());
         }
         if (pattern.getMaxOccurs() != null) {
             this.maxOccurs = pattern.getMaxOccurs();
diff --git 
a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
 
b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index b83faa2..c764cd7 100644
--- 
a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ 
b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -186,12 +186,12 @@
         }
 
         if (nodeid.equals(cluster.getMasterNode().getId())) {
-            String logDir = cluster.getMasterNode().getLogDir() == null ? 
cluster.getLogDir() : cluster.getMasterNode()
-                    .getLogDir();
-            String javaHome = cluster.getMasterNode().getJavaHome() == null ? 
cluster.getJavaHome() : cluster
-                    .getMasterNode().getJavaHome();
+            String logDir = cluster.getMasterNode().getLogDir() == null ? 
cluster.getLogDir()
+                    : cluster.getMasterNode().getLogDir();
+            String javaHome = cluster.getMasterNode().getJavaHome() == null ? 
cluster.getJavaHome()
+                    : cluster.getMasterNode().getJavaHome();
             return new Node(cluster.getMasterNode().getId(), 
cluster.getMasterNode().getClusterIp(), javaHome, logDir,
-                    null, null, cluster.getMasterNode().getDebugPort());
+                    null, null, cluster.getMasterNode().getDebugPort(), null);
         }
 
         List<Node> nodeList = cluster.getNode();
@@ -231,8 +231,8 @@
         pb.start();
     }
 
-    public static void executeLocalScript(Node node, String script, 
List<String> args) throws IOException,
-            InterruptedException {
+    public static void executeLocalScript(Node node, String script, 
List<String> args)
+            throws IOException, InterruptedException {
         List<String> pargs = new ArrayList<String>();
         pargs.add("/bin/bash");
         pargs.add(script);
diff --git 
a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
 
b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index ecbafa7..0f4c9f5 100644
--- 
a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ 
b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -278,6 +278,7 @@
         configuration.setMetadataNode(asterixInstanceName + "_" + 
metadataNodeId);
         List<Store> stores = new ArrayList<Store>();
         String storeDir = cluster.getStore().trim();
+        String nodeFeedPort;
         for (Node node : cluster.getNode()) {
             String iodevices = node.getIodevices() == null ? 
cluster.getIodevices() : node.getIodevices();
             String[] nodeIdDevice = iodevices.split(",");
@@ -287,7 +288,9 @@
             }
             //remove last comma
             nodeStores.deleteCharAt(nodeStores.length() - 1);
-            stores.add(new Store(asterixInstanceName + "_" + node.getId(), 
nodeStores.toString()));
+            nodeFeedPort = node.getNcMessagingPort() == null ? 
String.valueOf(cluster.getNcMessagingPort().intValue())
+                    : String.valueOf(node.getNcMessagingPort().intValue());
+            stores.add(new Store(asterixInstanceName + "_" + node.getId(), 
nodeFeedPort, nodeStores.toString()));
         }
         configuration.setStore(stores);
         List<Coredump> coredump = new ArrayList<Coredump>();
@@ -354,6 +357,7 @@
 
     private static void zipDir(File sourceDir, final File destFile, 
ZipOutputStream zos) throws IOException {
         File[] dirList = sourceDir.listFiles(new FileFilter() {
+            @Override
             public boolean accept(File f) {
                 return !f.getName().endsWith(destFile.getName());
             }
@@ -391,7 +395,7 @@
         byte[] buffer = new byte[2048];
         int read;
         while (entries.hasMoreElements()) {
-            JarEntry entry = (JarEntry) entries.nextElement();
+            JarEntry entry = entries.nextElement();
             String name = entry.getName();
             if (name.equals(origFile)) {
                 continue;
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 7801fd7..38b4824 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -287,5 +287,10 @@
             <artifactId>rxjava</artifactId>
             <version>1.0.15</version>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>4.0.33.Final</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
new file mode 100644
index 0000000..7796082
--- /dev/null
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+
+public class FeedChannelManager extends Thread {
+
+    public FeedChannelManager(AsterixFeedProperties feedProperties, String 
nodeId) {
+    }
+
+    @Override
+    public void run() {
+    }
+}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
index 5095e7d..3965c6b 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
@@ -45,6 +45,8 @@
 
     private static final Logger LOGGER = 
Logger.getLogger(FeedManager.class.getName());
 
+    private FeedChannelManager feedChannelManager;
+
     private final IFeedSubscriptionManager feedSubscriptionManager;
 
     private final IFeedConnectionManager feedConnectionManager;
@@ -76,9 +78,11 @@
                 ? 
AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp() : 
"localhost";
         this.feedMessageService = new FeedMessageService(feedProperties, 
nodeId, ccClusterIp);
         this.nodeLoadReportService = new NodeLoadReportService(nodeId, this);
+        this.feedChannelManager = new FeedChannelManager(feedProperties, 
nodeId);
         try {
             this.feedMessageService.start();
             this.nodeLoadReportService.start();
+            this.feedChannelManager.start();
         } catch (Exception e) {
             if (LOGGER.isLoggable(Level.WARNING)) {
                 LOGGER.warning("Unable to start feed services " + 
e.getMessage());
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
new file mode 100644
index 0000000..22c9a94
--- /dev/null
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class FeedMessageHandler extends ChannelInboundHandlerAdapter {
+
+}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
new file mode 100644
index 0000000..171b925
--- /dev/null
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+public class FeedMessageServer {
+    private final int port;
+    private final int numberOfChannels;
+
+    public FeedMessageServer(int port, int numOfClusterNodes) {
+        this.port = port;
+        this.numberOfChannels = numOfClusterNodes;
+    }
+
+    public void start() throws Exception {
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, 
workerGroup).channel(NioServerSocketChannel.class)
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+                        @Override
+                        public void initChannel(SocketChannel ch) throws 
Exception {
+                            ch.pipeline().addLast(new FeedMessageHandler());
+                        }
+                    }).option(ChannelOption.SO_BACKLOG, 
numberOfChannels).childOption(ChannelOption.SO_KEEPALIVE, true);
+            ChannelFuture f = b.bind(port).sync();
+            f.channel().closeFuture().sync();
+        } finally {
+            workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
+        }
+    }
+}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
index 870a6df..bb78233 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
@@ -38,10 +38,9 @@
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.hdfs.scheduler.Scheduler;
 
 public class IndexingScheduler {
-    private static final Logger LOGGER = 
Logger.getLogger(Scheduler.class.getName());
+    private static final Logger LOGGER = 
Logger.getLogger(IndexingScheduler.class.getName());
 
     /** a list of NCs */
     private String[] NCs;
diff --git 
a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
 
b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 9559394..4037eaf 100644
--- 
a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ 
b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -105,7 +105,7 @@
 
             MasterNode masterNode = cluster.getMasterNode();
             Node master = new Node(masterNode.getId(), 
masterNode.getClusterIp(), masterNode.getJavaHome(),
-                    masterNode.getLogDir(), null, null, null);
+                    masterNode.getLogDir(), null, null, null, null);
             ipAddresses.add(masterNode.getClusterIp());
 
             valid = valid & validateNodeConfiguration(master, cluster);
diff --git a/asterix-installer/src/main/resources/clusters/demo/demo.xml 
b/asterix-installer/src/main/resources/clusters/demo/demo.xml
index 1932fcc..8d5f13b 100644
--- a/asterix-installer/src/main/resources/clusters/demo/demo.xml
+++ b/asterix-installer/src/main/resources/clusters/demo/demo.xml
@@ -32,6 +32,7 @@
        <log_dir>/tmp/asterix/logs</log_dir>
        <store>storage</store>
        <java_home></java_home>
+    <nc_messaging_port>5403</nc_messaging_port>
        <master_node>
                <id>master</id>
                <client_ip>127.0.0.1</client_ip>
@@ -45,11 +46,13 @@
                <cluster_ip>127.0.0.1</cluster_ip>
                <txn_log_dir>/tmp/asterix/node1/txnLogs</txn_log_dir>
                <iodevices>/tmp/asterix/node1/1,/tmp/asterix/node1/2</iodevices>
+        <nc_messaging_port>5401</nc_messaging_port>
        </node>
        <node>
                <id>node2</id>
                <cluster_ip>127.0.0.1</cluster_ip>
                <txn_log_dir>/tmp/asterix/node2/txnLogs</txn_log_dir>
                <iodevices>/tmp/asterix/node2/1,/tmp/asterix/node2/2</iodevices>
+        <nc_messaging_port>5402</nc_messaging_port>
        </node>
 </cluster>
diff --git a/asterix-installer/src/main/resources/clusters/local/local.xml 
b/asterix-installer/src/main/resources/clusters/local/local.xml
index 20f697f..7ed8b74 100644
--- a/asterix-installer/src/main/resources/clusters/local/local.xml
+++ b/asterix-installer/src/main/resources/clusters/local/local.xml
@@ -40,6 +40,7 @@
     <result_time_to_live>86400000</result_time_to_live>
     <!-- The duration within which an instance of the result cleanup should be 
invoked in milliseconds. (default: 1 minute) -->
     <result_sweep_threshold>60000</result_sweep_threshold>
+    <nc_messaging_port>5403</nc_messaging_port>
     <master_node>
         <id>master</id>
         <client_ip>127.0.0.1</client_ip>
@@ -53,12 +54,13 @@
         <cluster_ip>127.0.0.1</cluster_ip>
         <txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir>
         <iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices>
-
+        <nc_messaging_port>5401</nc_messaging_port>
     </node>
     <node>
         <id>nc2</id>
         <cluster_ip>127.0.0.1</cluster_ip>
         <txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir>
         <iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices>
+        <nc_messaging_port>5402</nc_messaging_port>
     </node>
 </cluster>
diff --git 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
index 3e37694..e7bf3bf 100644
--- 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++ 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
@@ -62,9 +62,10 @@
 
     private ClusterManager() {
         Cluster asterixCluster = 
AsterixClusterProperties.INSTANCE.getCluster();
-        String eventHome = asterixCluster == null ? null : 
asterixCluster.getWorkingDir().getDir();
+        String eventHome = asterixCluster == null ? null
+                : asterixCluster.getWorkingDir() == null ? null : 
asterixCluster.getWorkingDir().getDir();
 
-        if (asterixCluster != null) {
+        if (eventHome != null) {
             String asterixDir = System.getProperty("user.dir") + 
File.separator + "asterix";
             File configFile = new File(System.getProperty("user.dir") + 
File.separator + "configuration.xml");
             Configuration configuration = null;
@@ -74,8 +75,8 @@
                 Unmarshaller unmarshaller = configCtx.createUnmarshaller();
                 configuration = (Configuration) 
unmarshaller.unmarshal(configFile);
                 AsterixEventService.initialize(configuration, asterixDir, 
eventHome);
-                client = 
AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE
-                        .getCluster());
+                client = AsterixEventService
+                        
.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE.getCluster());
 
                 lookupService = ServiceProvider.INSTANCE.getLookupService();
                 if (!lookupService.isRunning(configuration)) {
diff --git 
a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
 
b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 3173525..faa0236 100644
--- 
a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ 
b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -96,6 +96,7 @@
     private Set<String> failedNodes = new HashSet<>();
     private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
     private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+    private HashMap<String, String> node2Ip = null;
 
     private AsterixClusterProperties() {
         InputStream is = 
this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -686,4 +687,20 @@
         }
         return stateDescription;
     }
-}
+
+    public String getNodeIpAddress(String nodeId) {
+        if (node2Ip == null) {
+            synchronized (this) {
+                if (node2Ip == null) {
+                    node2Ip = new HashMap<>();
+                    String instanceName = cluster.getInstanceName();
+                    List<Node> nodes = cluster.getNode();
+                    for (Node node : nodes) {
+                        node2Ip.put(instanceName + "_" + node.getId(), 
node.getClusterIp());
+                    }
+                }
+            }
+        }
+        return node2Ip.get(nodeId);
+    }
+}
\ No newline at end of file
diff --git 
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
 
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
index 5b29530..ab1ebe1 100644
--- 
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++ 
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
@@ -57,7 +57,7 @@
             //if no response available or it has an exception, request a new 
one
             if (reponse == null || reponse.getException() != null) {
                 ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
-                ((INCMessageBroker) 
appCtx.getMessageBroker()).sendMessage(msg, this);
+                ((INCMessageBroker) 
appCtx.getMessageBroker()).sendMessageToCC(msg, this);
                 reponse = (ResourceIdRequestResponseMessage) 
resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
                     throw new 
HyracksDataException(reponse.getException().getMessage());
diff --git 
a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java 
b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
index f9d10af..c834128 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
@@ -1358,7 +1358,7 @@
             }
             //remove last comma
             nodeStores.deleteCharAt(nodeStores.length() - 1);
-            stores.add(new Store(node.getId(), nodeStores.toString()));
+            stores.add(new Store(node.getId(), "4501", nodeStores.toString()));
         }
         configuration.setStore(stores);
         List<Coredump> coredump = new ArrayList<Coredump>();
@@ -1369,10 +1369,13 @@
             coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : 
node.getLogDir();
             coredump.add(new Coredump(node.getId(), coredumpDir + "coredump" + 
File.separator));
             txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : 
node.getTxnLogDir(); //node or cluster-wide
-            txnLogDirs.add(new TransactionLogDir(node.getId(), txnLogDir
-                    + (txnLogDir.charAt(txnLogDir.length() - 1) == 
File.separatorChar ? File.separator : "")
-                    + "txnLogs" //if the string doesn't have a trailing / add 
one
-                    + File.separator));
+            txnLogDirs
+                    .add(new TransactionLogDir(node.getId(),
+                            txnLogDir
+                                    + (txnLogDir.charAt(txnLogDir.length() - 
1) == File.separatorChar ? File.separator
+                                            : "")
+                                    + "txnLogs" //if the string doesn't have a 
trailing / add one
+                                    + File.separator));
         }
         configuration.setMetadataNode(metadataNodeId);
         configuration.setCoredump(coredump);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to